diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index fbe7cfe07..8c2736a28 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -59,6 +59,7 @@ #include "distributed/replication_origin_session_utils.h" #include "distributed/shared_library_init.h" #include "distributed/shard_utils.h" +#include "distributed/tenant_schema_metadata.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "executor/spi.h" diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 24dd8e892..26579cd60 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -19,6 +19,7 @@ #include "distributed/coordinator_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/tenant_schema_metadata.h" #include "distributed/worker_transaction.h" #include "utils/builtins.h" #include "utils/lsyscache.h" diff --git a/src/backend/distributed/commands/schema_based_sharding.c b/src/backend/distributed/commands/schema_based_sharding.c index b717cb5ae..65d2b8127 100644 --- a/src/backend/distributed/commands/schema_based_sharding.c +++ b/src/backend/distributed/commands/schema_based_sharding.c @@ -21,6 +21,7 @@ #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/shard_transfer.h" #include "distributed/tenant_schema_metadata.h" #include "distributed/worker_shard_visibility.h" #include "utils/builtins.h" @@ -29,6 +30,16 @@ #include "utils/syscache.h" +/* return value of CreateCitusMoveSchemaParams() */ +typedef struct +{ + uint64 anchorShardId; + uint32 sourceNodeId; + char *sourceNodeName; + uint32 sourceNodePort; +} CitusMoveSchemaParams; + + static void UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName); static List * SchemaGetNonShardTableIdList(Oid schemaId); static void EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList); @@ -36,10 +47,14 @@ static void EnsureTenantSchemaNameAllowed(Oid schemaId); static void EnsureTableKindSupportedForTenantSchema(Oid relationId); static void EnsureFKeysForTenantTable(Oid relationId); static void EnsureSchemaExist(Oid schemaId); +static CitusMoveSchemaParams * CreateCitusMoveSchemaParams(Oid schemaId); +static uint64 TenantSchemaPickAnchorShardId(Oid schemaId); + /* controlled via citus.enable_schema_based_sharding GUC */ bool EnableSchemaBasedSharding = false; + const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = { "undistribute_table", "alter_distributed_table", @@ -52,6 +67,8 @@ const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = { PG_FUNCTION_INFO_V1(citus_internal_unregister_tenant_schema_globally); PG_FUNCTION_INFO_V1(citus_schema_distribute); PG_FUNCTION_INFO_V1(citus_schema_undistribute); +PG_FUNCTION_INFO_V1(citus_schema_move); +PG_FUNCTION_INFO_V1(citus_schema_move_with_nodeid); /* * ShouldUseSchemaBasedSharding returns true if schema given name should be @@ -757,6 +774,139 @@ citus_schema_undistribute(PG_FUNCTION_ARGS) } +/* + * citus_schema_move moves the shards that belong to given distributed tenant + * schema from one node to the other node by using citus_move_shard_placement(). + */ +Datum +citus_schema_move(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + Oid schemaId = PG_GETARG_OID(0); + CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId); + + DirectFunctionCall6(citus_move_shard_placement, + UInt64GetDatum(params->anchorShardId), + CStringGetTextDatum(params->sourceNodeName), + UInt32GetDatum(params->sourceNodePort), + PG_GETARG_DATUM(1), + PG_GETARG_DATUM(2), + PG_GETARG_DATUM(3)); + PG_RETURN_VOID(); +} + + +/* + * citus_schema_move_with_nodeid does the same as citus_schema_move(), but + * accepts node id as parameter instead of hostname and port, hence uses + * citus_move_shard_placement_with_nodeid(). + */ +Datum +citus_schema_move_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + Oid schemaId = PG_GETARG_OID(0); + CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId); + + DirectFunctionCall4(citus_move_shard_placement_with_nodeid, + UInt64GetDatum(params->anchorShardId), + UInt32GetDatum(params->sourceNodeId), + PG_GETARG_DATUM(1), + PG_GETARG_DATUM(2)); + PG_RETURN_VOID(); +} + + +/* + * CreateCitusMoveSchemaParams is a helper function for + * citus_schema_move() and citus_schema_move_with_nodeid() + * that validates input schema and returns the parameters to be used in underlying + * shard transfer functions. + */ +static CitusMoveSchemaParams * +CreateCitusMoveSchemaParams(Oid schemaId) +{ + EnsureSchemaExist(schemaId); + EnsureSchemaOwner(schemaId); + + if (!IsTenantSchema(schemaId)) + { + ereport(ERROR, (errmsg("schema %s is not a distributed schema", + get_namespace_name(schemaId)))); + } + + uint64 anchorShardId = TenantSchemaPickAnchorShardId(schemaId); + if (anchorShardId == INVALID_SHARD_ID) + { + ereport(ERROR, (errmsg("cannot move distributed schema %s because it is empty", + get_namespace_name(schemaId)))); + } + + uint32 colocationId = SchemaIdGetTenantColocationId(schemaId); + uint32 sourceNodeId = SingleShardTableColocationNodeId(colocationId); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + + CitusMoveSchemaParams *params = palloc0(sizeof(CitusMoveSchemaParams)); + params->anchorShardId = anchorShardId; + params->sourceNodeId = sourceNodeId; + params->sourceNodeName = sourceNode->workerName; + params->sourceNodePort = sourceNode->workerPort; + return params; +} + + +/* + * TenantSchemaPickAnchorShardId returns the id of one of the shards + * created in given tenant schema. + * + * Returns INVALID_SHARD_ID if the schema was initially empty or if it's not + * a tenant schema. + * + * Throws an error if all the tables in the schema are concurrently dropped. + */ +static uint64 +TenantSchemaPickAnchorShardId(Oid schemaId) +{ + uint32 colocationId = SchemaIdGetTenantColocationId(schemaId); + List *tablesInSchema = ColocationGroupTableList(colocationId, 0); + if (list_length(tablesInSchema) == 0) + { + return INVALID_SHARD_ID; + } + + Oid relationId = InvalidOid; + foreach_oid(relationId, tablesInSchema) + { + /* + * Make sure the relation isn't dropped for the remainder of + * the transaction. + */ + LockRelationOid(relationId, AccessShareLock); + + /* + * The relation might have been dropped just before we locked it. + * Let's look it up. + */ + Relation relation = RelationIdGetRelation(relationId); + if (RelationIsValid(relation)) + { + /* relation still exists, we can use it */ + RelationClose(relation); + return GetFirstShardId(relationId); + } + } + + ereport(ERROR, (errmsg("tables in schema %s are concurrently dropped", + get_namespace_name(schemaId)))); +} + + /* * ErrorIfTenantTable errors out with the given operation name, * if the given relation is a tenant table. diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index b904b6a83..4e2a515a3 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -8,3 +8,5 @@ #include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql" #include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql" + +#include "udfs/citus_schema_move/12.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index 82ff28a6b..6f58b2f54 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -13,3 +13,12 @@ DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata( placement_id bigint ); +DROP FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, target_node_name text, target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode +); + +DROP FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, target_node_id integer, + shard_transfer_mode citus.shard_transfer_mode +); diff --git a/src/backend/distributed/sql/udfs/citus_schema_move/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_schema_move/12.1-1.sql new file mode 100644 index 000000000..8ca7e703f --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_schema_move/12.1-1.sql @@ -0,0 +1,29 @@ +-- citus_schema_move, using target node name and node port +CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_schema_move$$; +COMMENT ON FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode) +IS 'move a distributed schema to given node'; + +-- citus_schema_move, using target node id +CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_id integer, + shard_transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$; +COMMENT ON FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_id integer, + shard_transfer_mode citus.shard_transfer_mode) +IS 'move a distributed schema to given node'; diff --git a/src/backend/distributed/sql/udfs/citus_schema_move/latest.sql b/src/backend/distributed/sql/udfs/citus_schema_move/latest.sql new file mode 100644 index 000000000..8ca7e703f --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_schema_move/latest.sql @@ -0,0 +1,29 @@ +-- citus_schema_move, using target node name and node port +CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_schema_move$$; +COMMENT ON FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode) +IS 'move a distributed schema to given node'; + +-- citus_schema_move, using target node id +CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_id integer, + shard_transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$; +COMMENT ON FUNCTION pg_catalog.citus_schema_move( + schema_id regnamespace, + target_node_id integer, + shard_transfer_mode citus.shard_transfer_mode) +IS 'move a distributed schema to given node'; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 969119bfe..c120f9429 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -818,7 +818,6 @@ extern void UpdateAutoConvertedForConnectedRelations(List *relationId, bool /* schema_based_sharding.c */ extern bool ShouldUseSchemaBasedSharding(char *schemaName); extern bool ShouldCreateTenantSchemaTable(Oid relationId); -extern bool IsTenantSchema(Oid schemaId); extern void EnsureTenantTable(Oid relationId, char *operationName); extern void ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId, Oid partitionRelationId); diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index a37e5abdb..a6d024a2e 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -12,6 +12,9 @@ #include "distributed/shard_rebalancer.h" #include "nodes/pg_list.h" +extern Datum citus_move_shard_placement(PG_FUNCTION_ARGS); +extern Datum citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS); + typedef enum { SHARD_TRANSFER_INVALID_FIRST = 0, diff --git a/src/test/regress/expected/citus_schema_move.out b/src/test/regress/expected/citus_schema_move.out new file mode 100644 index 000000000..160d2062b --- /dev/null +++ b/src/test/regress/expected/citus_schema_move.out @@ -0,0 +1,224 @@ +CREATE SCHEMA citus_schema_move; +SET search_path TO citus_schema_move; +SET citus.next_shard_id TO 2220000; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +-- Due to a race condition that happens in TransferShards() when the same shard id +-- is used to create the same shard on a different worker node, need to call +-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before +-- running the tests. +-- +-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615. +CALL citus_cleanup_orphaned_resources(); +SET client_min_messages TO NOTICE; +-- test null input, should be no-op +SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SET citus.enable_schema_based_sharding TO ON; +CREATE SCHEMA s1; +-- test invalid schema +SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234); +ERROR: schema "no_such_schema" does not exist +SELECT citus_schema_move('no_such_schema', 1234); +ERROR: schema "no_such_schema" does not exist +-- test regular schema +SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234); +ERROR: schema citus_schema_move is not a distributed schema +SELECT citus_schema_move('citus_schema_move', 1234); +ERROR: schema citus_schema_move is not a distributed schema +-- test empty distributed schema +SELECT citus_schema_move('s1', 'dummy_node_name', 1234); +ERROR: cannot move distributed schema s1 because it is empty +SELECT citus_schema_move('s1', 1234); +ERROR: cannot move distributed schema s1 because it is empty +CREATE TABLE s1.t1 (a int); +-- test invalid node name / port / id +SELECT citus_schema_move('s1', 'dummy_node_name', 1234); +ERROR: Moving shards to a non-existing node is not supported +HINT: Add the target node via SELECT citus_add_node('dummy_node_name', 1234); +SELECT citus_schema_move('s1', 1234); +ERROR: node with node id 1234 could not be found +-- errors due to missing pkey / replicate ident. +SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node +WHERE isactive AND shouldhaveshards AND noderole='primary' AND + (nodename, nodeport) NOT IN ( + SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass + ); +ERROR: cannot use logical replication to transfer shards of the relation t1 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY +DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. +HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. +-- errors as we try to move the schema to the same node +SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes') +FROM citus_shards +JOIN pg_dist_node USING (nodename, nodeport) +WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass; +ERROR: cannot move shard to the same node +SELECT citus_schema_move('s1', nodeid, 'block_writes') +FROM citus_shards +JOIN pg_dist_node USING (nodename, nodeport) +WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass; +ERROR: cannot move shard to the same node +-- returns id, host name and host port of a non-coordinator node that given schema can be moved to +CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move( + schema_id regnamespace) +RETURNS TABLE (nodeid integer, nodename text, nodeport integer) +SET search_path TO 'pg_catalog, public' +AS $func$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id) + THEN + RAISE EXCEPTION '% is not a distributed schema', schema_id; + END IF; + + CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS + SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport + FROM pg_dist_node pdn1 + WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND + (pdn1.nodename, pdn1.nodeport) NOT IN ( + SELECT cs.nodename, cs.nodeport + FROM citus_shards cs + JOIN pg_dist_node pdn2 + ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport + WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text) + ); + + IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport) + THEN + RAISE EXCEPTION 'could not determine a node to move the schema to'; + END IF; + + RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1; +END; +$func$ LANGUAGE plpgsql; +CREATE TABLE s1.t2 (a int); +-- move the schema to a different node +SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s1') \gset +SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes'); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s1') \gset +SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes'); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- move the schema to the coordinator +SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes'); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- move the schema away from the coordinator +SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s1') \gset +SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes'); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE USER tenantuser superuser; +SET ROLE tenantuser; +CREATE SCHEMA s2; +CREATE TABLE s2.t1 (a int); +CREATE TABLE s2.t2 (a int); +CREATE USER regularuser; +SET ROLE regularuser; +-- throws an error as the user is not the owner of the schema +SELECT citus_schema_move('s2', 'dummy_node', 1234); +ERROR: must be owner of schema s2 +-- assign all tables to regularuser +RESET ROLE; +SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$); + result +--------------------------------------------------------------------- + REASSIGN OWNED + REASSIGN OWNED + REASSIGN OWNED +(3 rows) + +GRANT USAGE ON SCHEMA citus_schema_move TO regularuser; +SET ROLE regularuser; +SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s2') \gset +SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical'); + citus_schema_move +--------------------------------------------------------------------- + +(1 row) + +SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text)); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SET client_min_messages TO WARNING; +DROP SCHEMA s2 CASCADE; +SET client_min_messages TO NOTICE; +RESET ROLE; +REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser; +DROP ROLE regularuser, tenantuser; +RESET citus.enable_schema_based_sharding; +SET client_min_messages TO WARNING; +DROP SCHEMA citus_schema_move, s1 CASCADE; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 86ca9573a..fe203efb5 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1403,8 +1403,10 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- | function citus_internal_delete_placement_metadata(bigint) void | function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void - | function citus_pause_node_within_txn(integer,boolean,integer) void -(3 rows) + | function citus_pause_node_within_txn(integer,boolean,integer) void + | function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) void + | function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) void +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 1a4b5d04d..36bd504e8 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -118,6 +118,8 @@ ORDER BY 1; function citus_remove_node(text,integer) function citus_run_local_command(text) function citus_schema_distribute(regnamespace) + function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) + function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) function citus_schema_undistribute(regnamespace) function citus_server_id() function citus_set_coordinator_host(text,integer,noderole,name) @@ -341,5 +343,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(331 rows) +(333 rows) diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 290c1c110..145a83df2 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -307,6 +307,7 @@ test: mx_regular_user test: citus_locks test: global_cancel test: sequences_owned_by +test: citus_schema_move test: remove_coordinator # ---------- diff --git a/src/test/regress/sql/citus_schema_move.sql b/src/test/regress/sql/citus_schema_move.sql new file mode 100644 index 000000000..8240feff7 --- /dev/null +++ b/src/test/regress/sql/citus_schema_move.sql @@ -0,0 +1,175 @@ +CREATE SCHEMA citus_schema_move; +SET search_path TO citus_schema_move; + +SET citus.next_shard_id TO 2220000; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; + +SET client_min_messages TO WARNING; +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + +SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +-- Due to a race condition that happens in TransferShards() when the same shard id +-- is used to create the same shard on a different worker node, need to call +-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before +-- running the tests. +-- +-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615. + +CALL citus_cleanup_orphaned_resources(); + +SET client_min_messages TO NOTICE; + +-- test null input, should be no-op +SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null); +SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null); +SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null); + +SET citus.enable_schema_based_sharding TO ON; + +CREATE SCHEMA s1; + +-- test invalid schema +SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234); +SELECT citus_schema_move('no_such_schema', 1234); + +-- test regular schema +SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234); +SELECT citus_schema_move('citus_schema_move', 1234); + +-- test empty distributed schema +SELECT citus_schema_move('s1', 'dummy_node_name', 1234); +SELECT citus_schema_move('s1', 1234); + +CREATE TABLE s1.t1 (a int); + +-- test invalid node name / port / id +SELECT citus_schema_move('s1', 'dummy_node_name', 1234); +SELECT citus_schema_move('s1', 1234); + +-- errors due to missing pkey / replicate ident. +SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node +WHERE isactive AND shouldhaveshards AND noderole='primary' AND + (nodename, nodeport) NOT IN ( + SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass + ); + +-- errors as we try to move the schema to the same node +SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes') +FROM citus_shards +JOIN pg_dist_node USING (nodename, nodeport) +WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass; + +SELECT citus_schema_move('s1', nodeid, 'block_writes') +FROM citus_shards +JOIN pg_dist_node USING (nodename, nodeport) +WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass; + +-- returns id, host name and host port of a non-coordinator node that given schema can be moved to +CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move( + schema_id regnamespace) +RETURNS TABLE (nodeid integer, nodename text, nodeport integer) +SET search_path TO 'pg_catalog, public' +AS $func$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id) + THEN + RAISE EXCEPTION '% is not a distributed schema', schema_id; + END IF; + + CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS + SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport + FROM pg_dist_node pdn1 + WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND + (pdn1.nodename, pdn1.nodeport) NOT IN ( + SELECT cs.nodename, cs.nodeport + FROM citus_shards cs + JOIN pg_dist_node pdn2 + ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport + WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text) + ); + + IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport) + THEN + RAISE EXCEPTION 'could not determine a node to move the schema to'; + END IF; + + RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1; +END; +$func$ LANGUAGE plpgsql; + +CREATE TABLE s1.t2 (a int); + +-- move the schema to a different node + +SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s1') \gset + +SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes'); + +SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + +SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s1') \gset + +SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes'); + +SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + +-- move the schema to the coordinator + +SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes'); + +SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + +-- move the schema away from the coordinator + +SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s1') \gset + +SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes'); + +SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text)); + +CREATE USER tenantuser superuser; +SET ROLE tenantuser; + +CREATE SCHEMA s2; +CREATE TABLE s2.t1 (a int); +CREATE TABLE s2.t2 (a int); + +CREATE USER regularuser; +SET ROLE regularuser; + +-- throws an error as the user is not the owner of the schema +SELECT citus_schema_move('s2', 'dummy_node', 1234); + +-- assign all tables to regularuser +RESET ROLE; +SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$); + +GRANT USAGE ON SCHEMA citus_schema_move TO regularuser; + +SET ROLE regularuser; + +SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport +FROM get_non_coord_candidate_node_for_schema_move('s2') \gset + +SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical'); + +SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text)); + +SET client_min_messages TO WARNING; +DROP SCHEMA s2 CASCADE; +SET client_min_messages TO NOTICE; + +RESET ROLE; + +REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser; +DROP ROLE regularuser, tenantuser; + +RESET citus.enable_schema_based_sharding; + +SET client_min_messages TO WARNING; +DROP SCHEMA citus_schema_move, s1 CASCADE;