Add citus_schema_move() function (#7180)

Add citus_schema_move() that can be used to move tenant tables within a distributed
schema to another node. The function has two variations as simple wrappers around
citus_move_shard_placement() and citus_move_shard_placement_with_nodeid() respectively.
They pick a shard that belongs to the given tenant schema and resolve the source node
that contain the shards under given tenant schema. Hence their signatures are quite
similar to underlying functions:

```sql
-- citus_schema_move(), using target node name and node port
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
	schema_id regnamespace,
	target_node_name text,
	target_node_port integer,
	shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move$$;

-- citus_schema_move(), using target node id
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
	schema_id regnamespace,
	target_node_id integer,
	shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
```
pull/7191/head
Onur Tirtir 2023-09-08 12:03:53 +03:00 committed by GitHub
parent 8894c76ec0
commit d628a4c21a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 631 additions and 4 deletions

View File

@ -59,6 +59,7 @@
#include "distributed/replication_origin_session_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_utils.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "executor/spi.h"

View File

@ -19,6 +19,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"

View File

@ -21,6 +21,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/shard_transfer.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_shard_visibility.h"
#include "utils/builtins.h"
@ -29,6 +30,16 @@
#include "utils/syscache.h"
/* return value of CreateCitusMoveSchemaParams() */
typedef struct
{
uint64 anchorShardId;
uint32 sourceNodeId;
char *sourceNodeName;
uint32 sourceNodePort;
} CitusMoveSchemaParams;
static void UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName);
static List * SchemaGetNonShardTableIdList(Oid schemaId);
static void EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList);
@ -36,10 +47,14 @@ static void EnsureTenantSchemaNameAllowed(Oid schemaId);
static void EnsureTableKindSupportedForTenantSchema(Oid relationId);
static void EnsureFKeysForTenantTable(Oid relationId);
static void EnsureSchemaExist(Oid schemaId);
static CitusMoveSchemaParams * CreateCitusMoveSchemaParams(Oid schemaId);
static uint64 TenantSchemaPickAnchorShardId(Oid schemaId);
/* controlled via citus.enable_schema_based_sharding GUC */
bool EnableSchemaBasedSharding = false;
const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
"undistribute_table",
"alter_distributed_table",
@ -52,6 +67,8 @@ const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
PG_FUNCTION_INFO_V1(citus_internal_unregister_tenant_schema_globally);
PG_FUNCTION_INFO_V1(citus_schema_distribute);
PG_FUNCTION_INFO_V1(citus_schema_undistribute);
PG_FUNCTION_INFO_V1(citus_schema_move);
PG_FUNCTION_INFO_V1(citus_schema_move_with_nodeid);
/*
* ShouldUseSchemaBasedSharding returns true if schema given name should be
@ -757,6 +774,139 @@ citus_schema_undistribute(PG_FUNCTION_ARGS)
}
/*
* citus_schema_move moves the shards that belong to given distributed tenant
* schema from one node to the other node by using citus_move_shard_placement().
*/
Datum
citus_schema_move(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
Oid schemaId = PG_GETARG_OID(0);
CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);
DirectFunctionCall6(citus_move_shard_placement,
UInt64GetDatum(params->anchorShardId),
CStringGetTextDatum(params->sourceNodeName),
UInt32GetDatum(params->sourceNodePort),
PG_GETARG_DATUM(1),
PG_GETARG_DATUM(2),
PG_GETARG_DATUM(3));
PG_RETURN_VOID();
}
/*
* citus_schema_move_with_nodeid does the same as citus_schema_move(), but
* accepts node id as parameter instead of hostname and port, hence uses
* citus_move_shard_placement_with_nodeid().
*/
Datum
citus_schema_move_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
Oid schemaId = PG_GETARG_OID(0);
CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);
DirectFunctionCall4(citus_move_shard_placement_with_nodeid,
UInt64GetDatum(params->anchorShardId),
UInt32GetDatum(params->sourceNodeId),
PG_GETARG_DATUM(1),
PG_GETARG_DATUM(2));
PG_RETURN_VOID();
}
/*
* CreateCitusMoveSchemaParams is a helper function for
* citus_schema_move() and citus_schema_move_with_nodeid()
* that validates input schema and returns the parameters to be used in underlying
* shard transfer functions.
*/
static CitusMoveSchemaParams *
CreateCitusMoveSchemaParams(Oid schemaId)
{
EnsureSchemaExist(schemaId);
EnsureSchemaOwner(schemaId);
if (!IsTenantSchema(schemaId))
{
ereport(ERROR, (errmsg("schema %s is not a distributed schema",
get_namespace_name(schemaId))));
}
uint64 anchorShardId = TenantSchemaPickAnchorShardId(schemaId);
if (anchorShardId == INVALID_SHARD_ID)
{
ereport(ERROR, (errmsg("cannot move distributed schema %s because it is empty",
get_namespace_name(schemaId))));
}
uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
uint32 sourceNodeId = SingleShardTableColocationNodeId(colocationId);
bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
CitusMoveSchemaParams *params = palloc0(sizeof(CitusMoveSchemaParams));
params->anchorShardId = anchorShardId;
params->sourceNodeId = sourceNodeId;
params->sourceNodeName = sourceNode->workerName;
params->sourceNodePort = sourceNode->workerPort;
return params;
}
/*
* TenantSchemaPickAnchorShardId returns the id of one of the shards
* created in given tenant schema.
*
* Returns INVALID_SHARD_ID if the schema was initially empty or if it's not
* a tenant schema.
*
* Throws an error if all the tables in the schema are concurrently dropped.
*/
static uint64
TenantSchemaPickAnchorShardId(Oid schemaId)
{
uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
List *tablesInSchema = ColocationGroupTableList(colocationId, 0);
if (list_length(tablesInSchema) == 0)
{
return INVALID_SHARD_ID;
}
Oid relationId = InvalidOid;
foreach_oid(relationId, tablesInSchema)
{
/*
* Make sure the relation isn't dropped for the remainder of
* the transaction.
*/
LockRelationOid(relationId, AccessShareLock);
/*
* The relation might have been dropped just before we locked it.
* Let's look it up.
*/
Relation relation = RelationIdGetRelation(relationId);
if (RelationIsValid(relation))
{
/* relation still exists, we can use it */
RelationClose(relation);
return GetFirstShardId(relationId);
}
}
ereport(ERROR, (errmsg("tables in schema %s are concurrently dropped",
get_namespace_name(schemaId))));
}
/*
* ErrorIfTenantTable errors out with the given operation name,
* if the given relation is a tenant table.

View File

@ -8,3 +8,5 @@
#include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql"
#include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql"
#include "udfs/citus_schema_move/12.1-1.sql"

View File

@ -13,3 +13,12 @@ DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
placement_id bigint
);
DROP FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace, target_node_name text, target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode
);
DROP FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace, target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode
);

View File

@ -0,0 +1,29 @@
-- citus_schema_move, using target node name and node port
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move$$;
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a distributed schema to given node';
-- citus_schema_move, using target node id
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a distributed schema to given node';

View File

@ -0,0 +1,29 @@
-- citus_schema_move, using target node name and node port
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move$$;
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a distributed schema to given node';
-- citus_schema_move, using target node id
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a distributed schema to given node';

View File

@ -818,7 +818,6 @@ extern void UpdateAutoConvertedForConnectedRelations(List *relationId, bool
/* schema_based_sharding.c */
extern bool ShouldUseSchemaBasedSharding(char *schemaName);
extern bool ShouldCreateTenantSchemaTable(Oid relationId);
extern bool IsTenantSchema(Oid schemaId);
extern void EnsureTenantTable(Oid relationId, char *operationName);
extern void ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId,
Oid partitionRelationId);

View File

@ -12,6 +12,9 @@
#include "distributed/shard_rebalancer.h"
#include "nodes/pg_list.h"
extern Datum citus_move_shard_placement(PG_FUNCTION_ARGS);
extern Datum citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS);
typedef enum
{
SHARD_TRANSFER_INVALID_FIRST = 0,

View File

@ -0,0 +1,224 @@
CREATE SCHEMA citus_schema_move;
SET search_path TO citus_schema_move;
SET citus.next_shard_id TO 2220000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
-- Due to a race condition that happens in TransferShards() when the same shard id
-- is used to create the same shard on a different worker node, need to call
-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before
-- running the tests.
--
-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615.
CALL citus_cleanup_orphaned_resources();
SET client_min_messages TO NOTICE;
-- test null input, should be no-op
SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null);
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
citus_schema_move
---------------------------------------------------------------------
(1 row)
SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA s1;
-- test invalid schema
SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234);
ERROR: schema "no_such_schema" does not exist
SELECT citus_schema_move('no_such_schema', 1234);
ERROR: schema "no_such_schema" does not exist
-- test regular schema
SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234);
ERROR: schema citus_schema_move is not a distributed schema
SELECT citus_schema_move('citus_schema_move', 1234);
ERROR: schema citus_schema_move is not a distributed schema
-- test empty distributed schema
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
ERROR: cannot move distributed schema s1 because it is empty
SELECT citus_schema_move('s1', 1234);
ERROR: cannot move distributed schema s1 because it is empty
CREATE TABLE s1.t1 (a int);
-- test invalid node name / port / id
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
ERROR: Moving shards to a non-existing node is not supported
HINT: Add the target node via SELECT citus_add_node('dummy_node_name', 1234);
SELECT citus_schema_move('s1', 1234);
ERROR: node with node id 1234 could not be found
-- errors due to missing pkey / replicate ident.
SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node
WHERE isactive AND shouldhaveshards AND noderole='primary' AND
(nodename, nodeport) NOT IN (
SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass
);
ERROR: cannot use logical replication to transfer shards of the relation t1 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
-- errors as we try to move the schema to the same node
SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes')
FROM citus_shards
JOIN pg_dist_node USING (nodename, nodeport)
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
ERROR: cannot move shard to the same node
SELECT citus_schema_move('s1', nodeid, 'block_writes')
FROM citus_shards
JOIN pg_dist_node USING (nodename, nodeport)
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
ERROR: cannot move shard to the same node
-- returns id, host name and host port of a non-coordinator node that given schema can be moved to
CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move(
schema_id regnamespace)
RETURNS TABLE (nodeid integer, nodename text, nodeport integer)
SET search_path TO 'pg_catalog, public'
AS $func$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id)
THEN
RAISE EXCEPTION '% is not a distributed schema', schema_id;
END IF;
CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS
SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport
FROM pg_dist_node pdn1
WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND
(pdn1.nodename, pdn1.nodeport) NOT IN (
SELECT cs.nodename, cs.nodeport
FROM citus_shards cs
JOIN pg_dist_node pdn2
ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport
WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text)
);
IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport)
THEN
RAISE EXCEPTION 'could not determine a node to move the schema to';
END IF;
RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1;
END;
$func$ LANGUAGE plpgsql;
CREATE TABLE s1.t2 (a int);
-- move the schema to a different node
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes');
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
?column?
---------------------------------------------------------------------
t
(1 row)
-- move the schema to the coordinator
SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes');
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
?column?
---------------------------------------------------------------------
t
(1 row)
-- move the schema away from the coordinator
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE USER tenantuser superuser;
SET ROLE tenantuser;
CREATE SCHEMA s2;
CREATE TABLE s2.t1 (a int);
CREATE TABLE s2.t2 (a int);
CREATE USER regularuser;
SET ROLE regularuser;
-- throws an error as the user is not the owner of the schema
SELECT citus_schema_move('s2', 'dummy_node', 1234);
ERROR: must be owner of schema s2
-- assign all tables to regularuser
RESET ROLE;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$);
result
---------------------------------------------------------------------
REASSIGN OWNED
REASSIGN OWNED
REASSIGN OWNED
(3 rows)
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;
SET ROLE regularuser;
SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s2') \gset
SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical');
citus_schema_move
---------------------------------------------------------------------
(1 row)
SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text));
?column?
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA s2 CASCADE;
SET client_min_messages TO NOTICE;
RESET ROLE;
REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser;
DROP ROLE regularuser, tenantuser;
RESET citus.enable_schema_based_sharding;
SET client_min_messages TO WARNING;
DROP SCHEMA citus_schema_move, s1 CASCADE;

View File

@ -1404,7 +1404,9 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal_delete_placement_metadata(bigint) void
| function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void
| function citus_pause_node_within_txn(integer,boolean,integer) void
(3 rows)
| function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) void
| function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) void
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -118,6 +118,8 @@ ORDER BY 1;
function citus_remove_node(text,integer)
function citus_run_local_command(text)
function citus_schema_distribute(regnamespace)
function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode)
function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode)
function citus_schema_undistribute(regnamespace)
function citus_server_id()
function citus_set_coordinator_host(text,integer,noderole,name)
@ -341,5 +343,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(331 rows)
(333 rows)

View File

@ -307,6 +307,7 @@ test: mx_regular_user
test: citus_locks
test: global_cancel
test: sequences_owned_by
test: citus_schema_move
test: remove_coordinator
# ----------

View File

@ -0,0 +1,175 @@
CREATE SCHEMA citus_schema_move;
SET search_path TO citus_schema_move;
SET citus.next_shard_id TO 2220000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
-- Due to a race condition that happens in TransferShards() when the same shard id
-- is used to create the same shard on a different worker node, need to call
-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before
-- running the tests.
--
-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615.
CALL citus_cleanup_orphaned_resources();
SET client_min_messages TO NOTICE;
-- test null input, should be no-op
SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null);
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA s1;
-- test invalid schema
SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234);
SELECT citus_schema_move('no_such_schema', 1234);
-- test regular schema
SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234);
SELECT citus_schema_move('citus_schema_move', 1234);
-- test empty distributed schema
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
SELECT citus_schema_move('s1', 1234);
CREATE TABLE s1.t1 (a int);
-- test invalid node name / port / id
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
SELECT citus_schema_move('s1', 1234);
-- errors due to missing pkey / replicate ident.
SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node
WHERE isactive AND shouldhaveshards AND noderole='primary' AND
(nodename, nodeport) NOT IN (
SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass
);
-- errors as we try to move the schema to the same node
SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes')
FROM citus_shards
JOIN pg_dist_node USING (nodename, nodeport)
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
SELECT citus_schema_move('s1', nodeid, 'block_writes')
FROM citus_shards
JOIN pg_dist_node USING (nodename, nodeport)
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
-- returns id, host name and host port of a non-coordinator node that given schema can be moved to
CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move(
schema_id regnamespace)
RETURNS TABLE (nodeid integer, nodename text, nodeport integer)
SET search_path TO 'pg_catalog, public'
AS $func$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id)
THEN
RAISE EXCEPTION '% is not a distributed schema', schema_id;
END IF;
CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS
SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport
FROM pg_dist_node pdn1
WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND
(pdn1.nodename, pdn1.nodeport) NOT IN (
SELECT cs.nodename, cs.nodeport
FROM citus_shards cs
JOIN pg_dist_node pdn2
ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport
WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text)
);
IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport)
THEN
RAISE EXCEPTION 'could not determine a node to move the schema to';
END IF;
RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1;
END;
$func$ LANGUAGE plpgsql;
CREATE TABLE s1.t2 (a int);
-- move the schema to a different node
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes');
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
-- move the schema to the coordinator
SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes');
SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
-- move the schema away from the coordinator
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
CREATE USER tenantuser superuser;
SET ROLE tenantuser;
CREATE SCHEMA s2;
CREATE TABLE s2.t1 (a int);
CREATE TABLE s2.t2 (a int);
CREATE USER regularuser;
SET ROLE regularuser;
-- throws an error as the user is not the owner of the schema
SELECT citus_schema_move('s2', 'dummy_node', 1234);
-- assign all tables to regularuser
RESET ROLE;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$);
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;
SET ROLE regularuser;
SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport
FROM get_non_coord_candidate_node_for_schema_move('s2') \gset
SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical');
SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text));
SET client_min_messages TO WARNING;
DROP SCHEMA s2 CASCADE;
SET client_min_messages TO NOTICE;
RESET ROLE;
REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser;
DROP ROLE regularuser, tenantuser;
RESET citus.enable_schema_based_sharding;
SET client_min_messages TO WARNING;
DROP SCHEMA citus_schema_move, s1 CASCADE;