mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into alter_database_propagation
commit
fff24b8736
|
@ -59,6 +59,7 @@
|
||||||
#include "distributed/replication_origin_session_utils.h"
|
#include "distributed/replication_origin_session_utils.h"
|
||||||
#include "distributed/shared_library_init.h"
|
#include "distributed/shared_library_init.h"
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
|
#include "distributed/tenant_schema_metadata.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "executor/spi.h"
|
#include "executor/spi.h"
|
||||||
|
|
|
@ -188,7 +188,16 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
|
||||||
pfree(collcollate);
|
pfree(collcollate);
|
||||||
pfree(collctype);
|
pfree(collctype);
|
||||||
#endif
|
#endif
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||||
|
char *collicurules = NULL;
|
||||||
|
datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collicurules, &isnull);
|
||||||
|
if (!isnull)
|
||||||
|
{
|
||||||
|
collicurules = TextDatumGetCString(datum);
|
||||||
|
appendStringInfo(&collationNameDef, ", rules = %s",
|
||||||
|
quote_literal_cstr(collicurules));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
if (!collisdeterministic)
|
if (!collisdeterministic)
|
||||||
{
|
{
|
||||||
appendStringInfoString(&collationNameDef, ", deterministic = false");
|
appendStringInfoString(&collationNameDef, ", deterministic = false");
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
|
#include "distributed/tenant_schema_metadata.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/metadata/distobject.h"
|
#include "distributed/metadata/distobject.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
|
#include "distributed/shard_transfer.h"
|
||||||
#include "distributed/tenant_schema_metadata.h"
|
#include "distributed/tenant_schema_metadata.h"
|
||||||
#include "distributed/worker_shard_visibility.h"
|
#include "distributed/worker_shard_visibility.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
@ -29,6 +30,16 @@
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* return value of CreateCitusMoveSchemaParams() */
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
uint64 anchorShardId;
|
||||||
|
uint32 sourceNodeId;
|
||||||
|
char *sourceNodeName;
|
||||||
|
uint32 sourceNodePort;
|
||||||
|
} CitusMoveSchemaParams;
|
||||||
|
|
||||||
|
|
||||||
static void UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName);
|
static void UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName);
|
||||||
static List * SchemaGetNonShardTableIdList(Oid schemaId);
|
static List * SchemaGetNonShardTableIdList(Oid schemaId);
|
||||||
static void EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList);
|
static void EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList);
|
||||||
|
@ -36,10 +47,14 @@ static void EnsureTenantSchemaNameAllowed(Oid schemaId);
|
||||||
static void EnsureTableKindSupportedForTenantSchema(Oid relationId);
|
static void EnsureTableKindSupportedForTenantSchema(Oid relationId);
|
||||||
static void EnsureFKeysForTenantTable(Oid relationId);
|
static void EnsureFKeysForTenantTable(Oid relationId);
|
||||||
static void EnsureSchemaExist(Oid schemaId);
|
static void EnsureSchemaExist(Oid schemaId);
|
||||||
|
static CitusMoveSchemaParams * CreateCitusMoveSchemaParams(Oid schemaId);
|
||||||
|
static uint64 TenantSchemaPickAnchorShardId(Oid schemaId);
|
||||||
|
|
||||||
|
|
||||||
/* controlled via citus.enable_schema_based_sharding GUC */
|
/* controlled via citus.enable_schema_based_sharding GUC */
|
||||||
bool EnableSchemaBasedSharding = false;
|
bool EnableSchemaBasedSharding = false;
|
||||||
|
|
||||||
|
|
||||||
const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
|
const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
|
||||||
"undistribute_table",
|
"undistribute_table",
|
||||||
"alter_distributed_table",
|
"alter_distributed_table",
|
||||||
|
@ -52,6 +67,8 @@ const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_unregister_tenant_schema_globally);
|
PG_FUNCTION_INFO_V1(citus_internal_unregister_tenant_schema_globally);
|
||||||
PG_FUNCTION_INFO_V1(citus_schema_distribute);
|
PG_FUNCTION_INFO_V1(citus_schema_distribute);
|
||||||
PG_FUNCTION_INFO_V1(citus_schema_undistribute);
|
PG_FUNCTION_INFO_V1(citus_schema_undistribute);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_schema_move);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_schema_move_with_nodeid);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShouldUseSchemaBasedSharding returns true if schema given name should be
|
* ShouldUseSchemaBasedSharding returns true if schema given name should be
|
||||||
|
@ -757,6 +774,139 @@ citus_schema_undistribute(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_schema_move moves the shards that belong to given distributed tenant
|
||||||
|
* schema from one node to the other node by using citus_move_shard_placement().
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_schema_move(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
Oid schemaId = PG_GETARG_OID(0);
|
||||||
|
CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);
|
||||||
|
|
||||||
|
DirectFunctionCall6(citus_move_shard_placement,
|
||||||
|
UInt64GetDatum(params->anchorShardId),
|
||||||
|
CStringGetTextDatum(params->sourceNodeName),
|
||||||
|
UInt32GetDatum(params->sourceNodePort),
|
||||||
|
PG_GETARG_DATUM(1),
|
||||||
|
PG_GETARG_DATUM(2),
|
||||||
|
PG_GETARG_DATUM(3));
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_schema_move_with_nodeid does the same as citus_schema_move(), but
|
||||||
|
* accepts node id as parameter instead of hostname and port, hence uses
|
||||||
|
* citus_move_shard_placement_with_nodeid().
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_schema_move_with_nodeid(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
Oid schemaId = PG_GETARG_OID(0);
|
||||||
|
CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);
|
||||||
|
|
||||||
|
DirectFunctionCall4(citus_move_shard_placement_with_nodeid,
|
||||||
|
UInt64GetDatum(params->anchorShardId),
|
||||||
|
UInt32GetDatum(params->sourceNodeId),
|
||||||
|
PG_GETARG_DATUM(1),
|
||||||
|
PG_GETARG_DATUM(2));
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateCitusMoveSchemaParams is a helper function for
|
||||||
|
* citus_schema_move() and citus_schema_move_with_nodeid()
|
||||||
|
* that validates input schema and returns the parameters to be used in underlying
|
||||||
|
* shard transfer functions.
|
||||||
|
*/
|
||||||
|
static CitusMoveSchemaParams *
|
||||||
|
CreateCitusMoveSchemaParams(Oid schemaId)
|
||||||
|
{
|
||||||
|
EnsureSchemaExist(schemaId);
|
||||||
|
EnsureSchemaOwner(schemaId);
|
||||||
|
|
||||||
|
if (!IsTenantSchema(schemaId))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("schema %s is not a distributed schema",
|
||||||
|
get_namespace_name(schemaId))));
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64 anchorShardId = TenantSchemaPickAnchorShardId(schemaId);
|
||||||
|
if (anchorShardId == INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot move distributed schema %s because it is empty",
|
||||||
|
get_namespace_name(schemaId))));
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
|
||||||
|
uint32 sourceNodeId = SingleShardTableColocationNodeId(colocationId);
|
||||||
|
|
||||||
|
bool missingOk = false;
|
||||||
|
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
|
||||||
|
|
||||||
|
CitusMoveSchemaParams *params = palloc0(sizeof(CitusMoveSchemaParams));
|
||||||
|
params->anchorShardId = anchorShardId;
|
||||||
|
params->sourceNodeId = sourceNodeId;
|
||||||
|
params->sourceNodeName = sourceNode->workerName;
|
||||||
|
params->sourceNodePort = sourceNode->workerPort;
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TenantSchemaPickAnchorShardId returns the id of one of the shards
|
||||||
|
* created in given tenant schema.
|
||||||
|
*
|
||||||
|
* Returns INVALID_SHARD_ID if the schema was initially empty or if it's not
|
||||||
|
* a tenant schema.
|
||||||
|
*
|
||||||
|
* Throws an error if all the tables in the schema are concurrently dropped.
|
||||||
|
*/
|
||||||
|
static uint64
|
||||||
|
TenantSchemaPickAnchorShardId(Oid schemaId)
|
||||||
|
{
|
||||||
|
uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
|
||||||
|
List *tablesInSchema = ColocationGroupTableList(colocationId, 0);
|
||||||
|
if (list_length(tablesInSchema) == 0)
|
||||||
|
{
|
||||||
|
return INVALID_SHARD_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
foreach_oid(relationId, tablesInSchema)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Make sure the relation isn't dropped for the remainder of
|
||||||
|
* the transaction.
|
||||||
|
*/
|
||||||
|
LockRelationOid(relationId, AccessShareLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The relation might have been dropped just before we locked it.
|
||||||
|
* Let's look it up.
|
||||||
|
*/
|
||||||
|
Relation relation = RelationIdGetRelation(relationId);
|
||||||
|
if (RelationIsValid(relation))
|
||||||
|
{
|
||||||
|
/* relation still exists, we can use it */
|
||||||
|
RelationClose(relation);
|
||||||
|
return GetFirstShardId(relationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(ERROR, (errmsg("tables in schema %s are concurrently dropped",
|
||||||
|
get_namespace_name(schemaId))));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfTenantTable errors out with the given operation name,
|
* ErrorIfTenantTable errors out with the given operation name,
|
||||||
* if the given relation is a tenant table.
|
* if the given relation is a tenant table.
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "pg_version_compat.h"
|
#include "pg_version_compat.h"
|
||||||
|
|
||||||
|
#include "commands/defrem.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/deparser.h"
|
#include "distributed/deparser.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
@ -384,7 +385,10 @@ AppendGrantWithAdminOption(StringInfo buf, GrantRoleStmt *stmt)
|
||||||
DefElem *opt = NULL;
|
DefElem *opt = NULL;
|
||||||
foreach_ptr(opt, stmt->opt)
|
foreach_ptr(opt, stmt->opt)
|
||||||
{
|
{
|
||||||
if (strcmp(opt->defname, "admin") == 0)
|
bool admin_option = false;
|
||||||
|
char *optval = defGetString(opt);
|
||||||
|
if (strcmp(opt->defname, "admin") == 0 &&
|
||||||
|
parse_bool(optval, &admin_option) && admin_option)
|
||||||
{
|
{
|
||||||
appendStringInfo(buf, " WITH ADMIN OPTION");
|
appendStringInfo(buf, " WITH ADMIN OPTION");
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -8,3 +8,5 @@
|
||||||
|
|
||||||
#include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql"
|
#include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql"
|
||||||
#include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql"
|
#include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql"
|
||||||
|
|
||||||
|
#include "udfs/citus_schema_move/12.1-1.sql"
|
||||||
|
|
|
@ -13,3 +13,12 @@ DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
|
||||||
placement_id bigint
|
placement_id bigint
|
||||||
);
|
);
|
||||||
|
|
||||||
|
DROP FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace, target_node_name text, target_node_port integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode
|
||||||
|
);
|
||||||
|
|
||||||
|
DROP FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace, target_node_id integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode
|
||||||
|
);
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
-- citus_schema_move, using target node name and node port
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_name text,
|
||||||
|
target_node_port integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_schema_move$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_name text,
|
||||||
|
target_node_port integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode)
|
||||||
|
IS 'move a distributed schema to given node';
|
||||||
|
|
||||||
|
-- citus_schema_move, using target node id
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_id integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_id integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode)
|
||||||
|
IS 'move a distributed schema to given node';
|
|
@ -0,0 +1,29 @@
|
||||||
|
-- citus_schema_move, using target node name and node port
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_name text,
|
||||||
|
target_node_port integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_schema_move$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_name text,
|
||||||
|
target_node_port integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode)
|
||||||
|
IS 'move a distributed schema to given node';
|
||||||
|
|
||||||
|
-- citus_schema_move, using target node id
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_id integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_schema_move(
|
||||||
|
schema_id regnamespace,
|
||||||
|
target_node_id integer,
|
||||||
|
shard_transfer_mode citus.shard_transfer_mode)
|
||||||
|
IS 'move a distributed schema to given node';
|
|
@ -826,7 +826,6 @@ extern void UpdateAutoConvertedForConnectedRelations(List *relationId, bool
|
||||||
/* schema_based_sharding.c */
|
/* schema_based_sharding.c */
|
||||||
extern bool ShouldUseSchemaBasedSharding(char *schemaName);
|
extern bool ShouldUseSchemaBasedSharding(char *schemaName);
|
||||||
extern bool ShouldCreateTenantSchemaTable(Oid relationId);
|
extern bool ShouldCreateTenantSchemaTable(Oid relationId);
|
||||||
extern bool IsTenantSchema(Oid schemaId);
|
|
||||||
extern void EnsureTenantTable(Oid relationId, char *operationName);
|
extern void EnsureTenantTable(Oid relationId, char *operationName);
|
||||||
extern void ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId,
|
extern void ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId,
|
||||||
Oid partitionRelationId);
|
Oid partitionRelationId);
|
||||||
|
|
|
@ -12,6 +12,9 @@
|
||||||
#include "distributed/shard_rebalancer.h"
|
#include "distributed/shard_rebalancer.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
extern Datum citus_move_shard_placement(PG_FUNCTION_ARGS);
|
||||||
|
extern Datum citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
SHARD_TRANSFER_INVALID_FIRST = 0,
|
SHARD_TRANSFER_INVALID_FIRST = 0,
|
||||||
|
|
|
@ -0,0 +1,224 @@
|
||||||
|
CREATE SCHEMA citus_schema_move;
|
||||||
|
SET search_path TO citus_schema_move;
|
||||||
|
SET citus.next_shard_id TO 2220000;
|
||||||
|
SET citus.shard_count TO 32;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Due to a race condition that happens in TransferShards() when the same shard id
|
||||||
|
-- is used to create the same shard on a different worker node, need to call
|
||||||
|
-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before
|
||||||
|
-- running the tests.
|
||||||
|
--
|
||||||
|
-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615.
|
||||||
|
CALL citus_cleanup_orphaned_resources();
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
-- test null input, should be no-op
|
||||||
|
SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null);
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_schema_based_sharding TO ON;
|
||||||
|
CREATE SCHEMA s1;
|
||||||
|
-- test invalid schema
|
||||||
|
SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234);
|
||||||
|
ERROR: schema "no_such_schema" does not exist
|
||||||
|
SELECT citus_schema_move('no_such_schema', 1234);
|
||||||
|
ERROR: schema "no_such_schema" does not exist
|
||||||
|
-- test regular schema
|
||||||
|
SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234);
|
||||||
|
ERROR: schema citus_schema_move is not a distributed schema
|
||||||
|
SELECT citus_schema_move('citus_schema_move', 1234);
|
||||||
|
ERROR: schema citus_schema_move is not a distributed schema
|
||||||
|
-- test empty distributed schema
|
||||||
|
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
|
||||||
|
ERROR: cannot move distributed schema s1 because it is empty
|
||||||
|
SELECT citus_schema_move('s1', 1234);
|
||||||
|
ERROR: cannot move distributed schema s1 because it is empty
|
||||||
|
CREATE TABLE s1.t1 (a int);
|
||||||
|
-- test invalid node name / port / id
|
||||||
|
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
|
||||||
|
ERROR: Moving shards to a non-existing node is not supported
|
||||||
|
HINT: Add the target node via SELECT citus_add_node('dummy_node_name', 1234);
|
||||||
|
SELECT citus_schema_move('s1', 1234);
|
||||||
|
ERROR: node with node id 1234 could not be found
|
||||||
|
-- errors due to missing pkey / replicate ident.
|
||||||
|
SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node
|
||||||
|
WHERE isactive AND shouldhaveshards AND noderole='primary' AND
|
||||||
|
(nodename, nodeport) NOT IN (
|
||||||
|
SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass
|
||||||
|
);
|
||||||
|
ERROR: cannot use logical replication to transfer shards of the relation t1 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
|
||||||
|
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
|
||||||
|
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
|
||||||
|
-- errors as we try to move the schema to the same node
|
||||||
|
SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes')
|
||||||
|
FROM citus_shards
|
||||||
|
JOIN pg_dist_node USING (nodename, nodeport)
|
||||||
|
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
|
||||||
|
ERROR: cannot move shard to the same node
|
||||||
|
SELECT citus_schema_move('s1', nodeid, 'block_writes')
|
||||||
|
FROM citus_shards
|
||||||
|
JOIN pg_dist_node USING (nodename, nodeport)
|
||||||
|
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
|
||||||
|
ERROR: cannot move shard to the same node
|
||||||
|
-- returns id, host name and host port of a non-coordinator node that given schema can be moved to
|
||||||
|
CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move(
|
||||||
|
schema_id regnamespace)
|
||||||
|
RETURNS TABLE (nodeid integer, nodename text, nodeport integer)
|
||||||
|
SET search_path TO 'pg_catalog, public'
|
||||||
|
AS $func$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id)
|
||||||
|
THEN
|
||||||
|
RAISE EXCEPTION '% is not a distributed schema', schema_id;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS
|
||||||
|
SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport
|
||||||
|
FROM pg_dist_node pdn1
|
||||||
|
WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND
|
||||||
|
(pdn1.nodename, pdn1.nodeport) NOT IN (
|
||||||
|
SELECT cs.nodename, cs.nodeport
|
||||||
|
FROM citus_shards cs
|
||||||
|
JOIN pg_dist_node pdn2
|
||||||
|
ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport
|
||||||
|
WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text)
|
||||||
|
);
|
||||||
|
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport)
|
||||||
|
THEN
|
||||||
|
RAISE EXCEPTION 'could not determine a node to move the schema to';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1;
|
||||||
|
END;
|
||||||
|
$func$ LANGUAGE plpgsql;
|
||||||
|
CREATE TABLE s1.t2 (a int);
|
||||||
|
-- move the schema to a different node
|
||||||
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
||||||
|
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
||||||
|
SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes');
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- move the schema to the coordinator
|
||||||
|
SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes');
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- move the schema away from the coordinator
|
||||||
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
||||||
|
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE USER tenantuser superuser;
|
||||||
|
SET ROLE tenantuser;
|
||||||
|
CREATE SCHEMA s2;
|
||||||
|
CREATE TABLE s2.t1 (a int);
|
||||||
|
CREATE TABLE s2.t2 (a int);
|
||||||
|
CREATE USER regularuser;
|
||||||
|
SET ROLE regularuser;
|
||||||
|
-- throws an error as the user is not the owner of the schema
|
||||||
|
SELECT citus_schema_move('s2', 'dummy_node', 1234);
|
||||||
|
ERROR: must be owner of schema s2
|
||||||
|
-- assign all tables to regularuser
|
||||||
|
RESET ROLE;
|
||||||
|
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$);
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
REASSIGN OWNED
|
||||||
|
REASSIGN OWNED
|
||||||
|
REASSIGN OWNED
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;
|
||||||
|
SET ROLE regularuser;
|
||||||
|
SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s2') \gset
|
||||||
|
SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical');
|
||||||
|
citus_schema_move
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text));
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA s2 CASCADE;
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
RESET ROLE;
|
||||||
|
REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser;
|
||||||
|
DROP ROLE regularuser, tenantuser;
|
||||||
|
RESET citus.enable_schema_based_sharding;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA citus_schema_move, s1 CASCADE;
|
|
@ -1403,8 +1403,10 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
| function citus_internal_delete_placement_metadata(bigint) void
|
| function citus_internal_delete_placement_metadata(bigint) void
|
||||||
| function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void
|
| function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void
|
||||||
| function citus_pause_node_within_txn(integer,boolean,integer) void
|
| function citus_pause_node_within_txn(integer,boolean,integer) void
|
||||||
(3 rows)
|
| function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) void
|
||||||
|
| function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) void
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
|
@ -313,6 +313,90 @@ SELECT result FROM run_command_on_workers
|
||||||
DROP DATABASE
|
DROP DATABASE
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
SET search_path TO pg16;
|
||||||
|
-- New rules option added to CREATE COLLATION
|
||||||
|
-- Similar to above test with CREATE DATABASE
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/30a53b7
|
||||||
|
CREATE COLLATION default_rule (provider = icu, locale = '');
|
||||||
|
NOTICE: using standard form "und" for ICU locale ""
|
||||||
|
CREATE COLLATION special_rule (provider = icu, locale = '', rules = '&a < g');
|
||||||
|
NOTICE: using standard form "und" for ICU locale ""
|
||||||
|
CREATE TABLE test_collation_rules (a text);
|
||||||
|
SELECT create_distributed_table('test_collation_rules', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_collation_rules VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
|
||||||
|
SELECT collname, collprovider, colliculocale, collicurules
|
||||||
|
FROM pg_collation
|
||||||
|
WHERE collname like '%_rule%'
|
||||||
|
ORDER BY 1;
|
||||||
|
collname | collprovider | colliculocale | collicurules
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
default_rule | i | und |
|
||||||
|
special_rule | i | und | &a < g
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE default_rule;
|
||||||
|
a
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Abernathy
|
||||||
|
apple
|
||||||
|
bird
|
||||||
|
Boston
|
||||||
|
Graham
|
||||||
|
green
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE special_rule;
|
||||||
|
a
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Abernathy
|
||||||
|
apple
|
||||||
|
green
|
||||||
|
bird
|
||||||
|
Boston
|
||||||
|
Graham
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO pg16;
|
||||||
|
SELECT collname, collprovider, colliculocale, collicurules
|
||||||
|
FROM pg_collation
|
||||||
|
WHERE collname like '%_rule%'
|
||||||
|
ORDER BY 1;
|
||||||
|
collname | collprovider | colliculocale | collicurules
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
default_rule | i | und |
|
||||||
|
special_rule | i | und | &a < g
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE default_rule;
|
||||||
|
a
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Abernathy
|
||||||
|
apple
|
||||||
|
bird
|
||||||
|
Boston
|
||||||
|
Graham
|
||||||
|
green
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE special_rule;
|
||||||
|
a
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Abernathy
|
||||||
|
apple
|
||||||
|
green
|
||||||
|
bird
|
||||||
|
Boston
|
||||||
|
Graham
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
SET search_path TO pg16;
|
SET search_path TO pg16;
|
||||||
SET citus.next_shard_id TO 951000;
|
SET citus.next_shard_id TO 951000;
|
||||||
-- Foreign table TRUNCATE trigger
|
-- Foreign table TRUNCATE trigger
|
||||||
|
@ -520,6 +604,88 @@ SET citus.shard_replication_factor TO 1;
|
||||||
-- DEFAULT cannot be used in COPY TO
|
-- DEFAULT cannot be used in COPY TO
|
||||||
COPY (select 1 as test) TO stdout WITH (default '\D');
|
COPY (select 1 as test) TO stdout WITH (default '\D');
|
||||||
ERROR: COPY DEFAULT only available using COPY FROM
|
ERROR: COPY DEFAULT only available using COPY FROM
|
||||||
|
-- Tests for SQL/JSON: JSON_ARRAYAGG and JSON_OBJECTAGG aggregates
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/7081ac4
|
||||||
|
SET citus.next_shard_id TO 952000;
|
||||||
|
CREATE TABLE agg_test(a int, b serial);
|
||||||
|
SELECT create_distributed_table('agg_test', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO agg_test SELECT i FROM generate_series(1, 5) i;
|
||||||
|
-- JSON_ARRAYAGG with distribution key
|
||||||
|
SELECT JSON_ARRAYAGG(a ORDER BY a),
|
||||||
|
JSON_ARRAYAGG(a ORDER BY a RETURNING jsonb)
|
||||||
|
FROM agg_test;
|
||||||
|
json_arrayagg | json_arrayagg
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
[1, 2, 3, 4, 5] | [1, 2, 3, 4, 5]
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- JSON_ARRAYAGG with other column
|
||||||
|
SELECT JSON_ARRAYAGG(b ORDER BY b),
|
||||||
|
JSON_ARRAYAGG(b ORDER BY b RETURNING jsonb)
|
||||||
|
FROM agg_test;
|
||||||
|
json_arrayagg | json_arrayagg
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
[1, 2, 3, 4, 5] | [1, 2, 3, 4, 5]
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- JSON_ARRAYAGG with router query
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SELECT JSON_ARRAYAGG(a ORDER BY a),
|
||||||
|
JSON_ARRAYAGG(a ORDER BY a RETURNING jsonb)
|
||||||
|
FROM agg_test WHERE a = 2;
|
||||||
|
NOTICE: issuing SELECT JSON_ARRAYAGG(a ORDER BY a RETURNING json) AS "json_arrayagg", JSON_ARRAYAGG(a ORDER BY a RETURNING jsonb) AS "json_arrayagg" FROM pg16.agg_test_952000 agg_test WHERE (a OPERATOR(pg_catalog.=) 2)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
json_arrayagg | json_arrayagg
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
[2] | [2]
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
-- JSON_OBJECTAGG with distribution key
|
||||||
|
SELECT
|
||||||
|
JSON_OBJECTAGG(a: a),
|
||||||
|
JSON_ARRAYAGG(a ORDER BY a), -- for order
|
||||||
|
JSON_OBJECTAGG(a: a RETURNING jsonb)
|
||||||
|
FROM
|
||||||
|
agg_test;
|
||||||
|
json_objectagg | json_arrayagg | json_objectagg
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{ "1" : 1, "2" : 2, "3" : 3, "4" : 4, "5" : 5 } | [1, 2, 3, 4, 5] | {"1": 1, "2": 2, "3": 3, "4": 4, "5": 5}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- JSON_OBJECTAGG with other column
|
||||||
|
SELECT
|
||||||
|
JSON_OBJECTAGG(b: b),
|
||||||
|
JSON_ARRAYAGG(b ORDER BY b), -- for order
|
||||||
|
JSON_OBJECTAGG(b: b RETURNING jsonb)
|
||||||
|
FROM
|
||||||
|
agg_test;
|
||||||
|
json_objectagg | json_arrayagg | json_objectagg
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{ "1" : 1, "2" : 2, "3" : 3, "4" : 4, "5" : 5 } | [1, 2, 3, 4, 5] | {"1": 1, "2": 2, "3": 3, "4": 4, "5": 5}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- JSON_OBJECTAGG with router query
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SELECT
|
||||||
|
JSON_OBJECTAGG(a: a),
|
||||||
|
JSON_OBJECTAGG(a: a RETURNING jsonb)
|
||||||
|
FROM
|
||||||
|
agg_test WHERE a = 3;
|
||||||
|
NOTICE: issuing SELECT JSON_OBJECTAGG(a : a RETURNING json) AS "json_objectagg", JSON_OBJECTAGG(a : a RETURNING jsonb) AS "json_objectagg" FROM pg16.agg_test_952000 agg_test WHERE (a OPERATOR(pg_catalog.=) 3)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
json_objectagg | json_objectagg
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{ "3" : 3 } | {"3": 3}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
-- Tests for SQL/JSON: support the IS JSON predicate
|
-- Tests for SQL/JSON: support the IS JSON predicate
|
||||||
-- Relevant PG commit:
|
-- Relevant PG commit:
|
||||||
-- https://github.com/postgres/postgres/commit/6ee30209
|
-- https://github.com/postgres/postgres/commit/6ee30209
|
||||||
|
@ -805,6 +971,44 @@ LEFT JOIN ref_table ON TRUE;
|
||||||
1.19
|
1.19
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- PG16 added WITH ADMIN FALSE option to GRANT ROLE
|
||||||
|
-- WITH ADMIN FALSE is the default, make sure we propagate correctly in Citus
|
||||||
|
-- Relevant PG commit: https://github.com/postgres/postgres/commit/e3ce2de
|
||||||
|
--
|
||||||
|
CREATE ROLE role1;
|
||||||
|
CREATE ROLE role2;
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SET citus.grep_remote_commands = '%GRANT%';
|
||||||
|
-- default admin option is false
|
||||||
|
GRANT role1 TO role2;
|
||||||
|
NOTICE: issuing GRANT role1 TO role2;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing GRANT role1 TO role2;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
-- should behave same as default
|
||||||
|
GRANT role1 TO role2 WITH ADMIN FALSE;
|
||||||
|
NOTICE: issuing GRANT role1 TO role2;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing GRANT role1 TO role2;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
-- with admin option and with admin true are the same
|
||||||
|
GRANT role1 TO role2 WITH ADMIN OPTION;
|
||||||
|
NOTICE: issuing GRANT role1 TO role2 WITH ADMIN OPTION;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing GRANT role1 TO role2 WITH ADMIN OPTION;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
GRANT role1 TO role2 WITH ADMIN TRUE;
|
||||||
|
NOTICE: issuing GRANT role1 TO role2 WITH ADMIN OPTION;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing GRANT role1 TO role2 WITH ADMIN OPTION;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
RESET citus.grep_remote_commands;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP EXTENSION postgres_fdw CASCADE;
|
DROP EXTENSION postgres_fdw CASCADE;
|
||||||
|
|
|
@ -118,6 +118,8 @@ ORDER BY 1;
|
||||||
function citus_remove_node(text,integer)
|
function citus_remove_node(text,integer)
|
||||||
function citus_run_local_command(text)
|
function citus_run_local_command(text)
|
||||||
function citus_schema_distribute(regnamespace)
|
function citus_schema_distribute(regnamespace)
|
||||||
|
function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode)
|
||||||
|
function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode)
|
||||||
function citus_schema_undistribute(regnamespace)
|
function citus_schema_undistribute(regnamespace)
|
||||||
function citus_server_id()
|
function citus_server_id()
|
||||||
function citus_set_coordinator_host(text,integer,noderole,name)
|
function citus_set_coordinator_host(text,integer,noderole,name)
|
||||||
|
@ -341,5 +343,5 @@ ORDER BY 1;
|
||||||
view citus_stat_tenants_local
|
view citus_stat_tenants_local
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(331 rows)
|
(333 rows)
|
||||||
|
|
||||||
|
|
|
@ -308,6 +308,7 @@ test: mx_regular_user
|
||||||
test: citus_locks
|
test: citus_locks
|
||||||
test: global_cancel
|
test: global_cancel
|
||||||
test: sequences_owned_by
|
test: sequences_owned_by
|
||||||
|
test: citus_schema_move
|
||||||
test: remove_coordinator
|
test: remove_coordinator
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
|
|
|
@ -0,0 +1,175 @@
|
||||||
|
CREATE SCHEMA citus_schema_move;
|
||||||
|
SET search_path TO citus_schema_move;
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 2220000;
|
||||||
|
SET citus.shard_count TO 32;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
|
||||||
|
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||||
|
|
||||||
|
-- Due to a race condition that happens in TransferShards() when the same shard id
|
||||||
|
-- is used to create the same shard on a different worker node, need to call
|
||||||
|
-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before
|
||||||
|
-- running the tests.
|
||||||
|
--
|
||||||
|
-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615.
|
||||||
|
|
||||||
|
CALL citus_cleanup_orphaned_resources();
|
||||||
|
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
|
||||||
|
-- test null input, should be no-op
|
||||||
|
SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null);
|
||||||
|
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
|
||||||
|
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
|
||||||
|
|
||||||
|
SET citus.enable_schema_based_sharding TO ON;
|
||||||
|
|
||||||
|
CREATE SCHEMA s1;
|
||||||
|
|
||||||
|
-- test invalid schema
|
||||||
|
SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234);
|
||||||
|
SELECT citus_schema_move('no_such_schema', 1234);
|
||||||
|
|
||||||
|
-- test regular schema
|
||||||
|
SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234);
|
||||||
|
SELECT citus_schema_move('citus_schema_move', 1234);
|
||||||
|
|
||||||
|
-- test empty distributed schema
|
||||||
|
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
|
||||||
|
SELECT citus_schema_move('s1', 1234);
|
||||||
|
|
||||||
|
CREATE TABLE s1.t1 (a int);
|
||||||
|
|
||||||
|
-- test invalid node name / port / id
|
||||||
|
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
|
||||||
|
SELECT citus_schema_move('s1', 1234);
|
||||||
|
|
||||||
|
-- errors due to missing pkey / replicate ident.
|
||||||
|
SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node
|
||||||
|
WHERE isactive AND shouldhaveshards AND noderole='primary' AND
|
||||||
|
(nodename, nodeport) NOT IN (
|
||||||
|
SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass
|
||||||
|
);
|
||||||
|
|
||||||
|
-- errors as we try to move the schema to the same node
|
||||||
|
SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes')
|
||||||
|
FROM citus_shards
|
||||||
|
JOIN pg_dist_node USING (nodename, nodeport)
|
||||||
|
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
|
||||||
|
|
||||||
|
SELECT citus_schema_move('s1', nodeid, 'block_writes')
|
||||||
|
FROM citus_shards
|
||||||
|
JOIN pg_dist_node USING (nodename, nodeport)
|
||||||
|
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
|
||||||
|
|
||||||
|
-- returns id, host name and host port of a non-coordinator node that given schema can be moved to
|
||||||
|
CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move(
|
||||||
|
schema_id regnamespace)
|
||||||
|
RETURNS TABLE (nodeid integer, nodename text, nodeport integer)
|
||||||
|
SET search_path TO 'pg_catalog, public'
|
||||||
|
AS $func$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id)
|
||||||
|
THEN
|
||||||
|
RAISE EXCEPTION '% is not a distributed schema', schema_id;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS
|
||||||
|
SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport
|
||||||
|
FROM pg_dist_node pdn1
|
||||||
|
WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND
|
||||||
|
(pdn1.nodename, pdn1.nodeport) NOT IN (
|
||||||
|
SELECT cs.nodename, cs.nodeport
|
||||||
|
FROM citus_shards cs
|
||||||
|
JOIN pg_dist_node pdn2
|
||||||
|
ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport
|
||||||
|
WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text)
|
||||||
|
);
|
||||||
|
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport)
|
||||||
|
THEN
|
||||||
|
RAISE EXCEPTION 'could not determine a node to move the schema to';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1;
|
||||||
|
END;
|
||||||
|
$func$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE TABLE s1.t2 (a int);
|
||||||
|
|
||||||
|
-- move the schema to a different node
|
||||||
|
|
||||||
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
||||||
|
|
||||||
|
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
|
||||||
|
|
||||||
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
|
||||||
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
||||||
|
|
||||||
|
SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes');
|
||||||
|
|
||||||
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
|
||||||
|
-- move the schema to the coordinator
|
||||||
|
|
||||||
|
SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes');
|
||||||
|
|
||||||
|
SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
|
||||||
|
-- move the schema away from the coordinator
|
||||||
|
|
||||||
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
||||||
|
|
||||||
|
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
|
||||||
|
|
||||||
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
||||||
|
|
||||||
|
CREATE USER tenantuser superuser;
|
||||||
|
SET ROLE tenantuser;
|
||||||
|
|
||||||
|
CREATE SCHEMA s2;
|
||||||
|
CREATE TABLE s2.t1 (a int);
|
||||||
|
CREATE TABLE s2.t2 (a int);
|
||||||
|
|
||||||
|
CREATE USER regularuser;
|
||||||
|
SET ROLE regularuser;
|
||||||
|
|
||||||
|
-- throws an error as the user is not the owner of the schema
|
||||||
|
SELECT citus_schema_move('s2', 'dummy_node', 1234);
|
||||||
|
|
||||||
|
-- assign all tables to regularuser
|
||||||
|
RESET ROLE;
|
||||||
|
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$);
|
||||||
|
|
||||||
|
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;
|
||||||
|
|
||||||
|
SET ROLE regularuser;
|
||||||
|
|
||||||
|
SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport
|
||||||
|
FROM get_non_coord_candidate_node_for_schema_move('s2') \gset
|
||||||
|
|
||||||
|
SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical');
|
||||||
|
|
||||||
|
SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text));
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA s2 CASCADE;
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser;
|
||||||
|
DROP ROLE regularuser, tenantuser;
|
||||||
|
|
||||||
|
RESET citus.enable_schema_based_sharding;
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA citus_schema_move, s1 CASCADE;
|
|
@ -146,6 +146,40 @@ DROP DATABASE test_db;
|
||||||
SELECT result FROM run_command_on_workers
|
SELECT result FROM run_command_on_workers
|
||||||
($$DROP DATABASE test_db$$);
|
($$DROP DATABASE test_db$$);
|
||||||
SET search_path TO pg16;
|
SET search_path TO pg16;
|
||||||
|
|
||||||
|
-- New rules option added to CREATE COLLATION
|
||||||
|
-- Similar to above test with CREATE DATABASE
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/30a53b7
|
||||||
|
|
||||||
|
CREATE COLLATION default_rule (provider = icu, locale = '');
|
||||||
|
CREATE COLLATION special_rule (provider = icu, locale = '', rules = '&a < g');
|
||||||
|
|
||||||
|
CREATE TABLE test_collation_rules (a text);
|
||||||
|
SELECT create_distributed_table('test_collation_rules', 'a');
|
||||||
|
INSERT INTO test_collation_rules VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
|
||||||
|
|
||||||
|
SELECT collname, collprovider, colliculocale, collicurules
|
||||||
|
FROM pg_collation
|
||||||
|
WHERE collname like '%_rule%'
|
||||||
|
ORDER BY 1;
|
||||||
|
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE default_rule;
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE special_rule;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO pg16;
|
||||||
|
|
||||||
|
SELECT collname, collprovider, colliculocale, collicurules
|
||||||
|
FROM pg_collation
|
||||||
|
WHERE collname like '%_rule%'
|
||||||
|
ORDER BY 1;
|
||||||
|
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE default_rule;
|
||||||
|
SELECT * FROM test_collation_rules ORDER BY a COLLATE special_rule;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO pg16;
|
||||||
SET citus.next_shard_id TO 951000;
|
SET citus.next_shard_id TO 951000;
|
||||||
|
|
||||||
-- Foreign table TRUNCATE trigger
|
-- Foreign table TRUNCATE trigger
|
||||||
|
@ -319,6 +353,57 @@ SET citus.shard_replication_factor TO 1;
|
||||||
-- DEFAULT cannot be used in COPY TO
|
-- DEFAULT cannot be used in COPY TO
|
||||||
COPY (select 1 as test) TO stdout WITH (default '\D');
|
COPY (select 1 as test) TO stdout WITH (default '\D');
|
||||||
|
|
||||||
|
-- Tests for SQL/JSON: JSON_ARRAYAGG and JSON_OBJECTAGG aggregates
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/7081ac4
|
||||||
|
SET citus.next_shard_id TO 952000;
|
||||||
|
|
||||||
|
CREATE TABLE agg_test(a int, b serial);
|
||||||
|
SELECT create_distributed_table('agg_test', 'a');
|
||||||
|
INSERT INTO agg_test SELECT i FROM generate_series(1, 5) i;
|
||||||
|
|
||||||
|
-- JSON_ARRAYAGG with distribution key
|
||||||
|
SELECT JSON_ARRAYAGG(a ORDER BY a),
|
||||||
|
JSON_ARRAYAGG(a ORDER BY a RETURNING jsonb)
|
||||||
|
FROM agg_test;
|
||||||
|
|
||||||
|
-- JSON_ARRAYAGG with other column
|
||||||
|
SELECT JSON_ARRAYAGG(b ORDER BY b),
|
||||||
|
JSON_ARRAYAGG(b ORDER BY b RETURNING jsonb)
|
||||||
|
FROM agg_test;
|
||||||
|
|
||||||
|
-- JSON_ARRAYAGG with router query
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SELECT JSON_ARRAYAGG(a ORDER BY a),
|
||||||
|
JSON_ARRAYAGG(a ORDER BY a RETURNING jsonb)
|
||||||
|
FROM agg_test WHERE a = 2;
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
|
||||||
|
-- JSON_OBJECTAGG with distribution key
|
||||||
|
SELECT
|
||||||
|
JSON_OBJECTAGG(a: a),
|
||||||
|
JSON_ARRAYAGG(a ORDER BY a), -- for order
|
||||||
|
JSON_OBJECTAGG(a: a RETURNING jsonb)
|
||||||
|
FROM
|
||||||
|
agg_test;
|
||||||
|
|
||||||
|
-- JSON_OBJECTAGG with other column
|
||||||
|
SELECT
|
||||||
|
JSON_OBJECTAGG(b: b),
|
||||||
|
JSON_ARRAYAGG(b ORDER BY b), -- for order
|
||||||
|
JSON_OBJECTAGG(b: b RETURNING jsonb)
|
||||||
|
FROM
|
||||||
|
agg_test;
|
||||||
|
|
||||||
|
-- JSON_OBJECTAGG with router query
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SELECT
|
||||||
|
JSON_OBJECTAGG(a: a),
|
||||||
|
JSON_OBJECTAGG(a: a RETURNING jsonb)
|
||||||
|
FROM
|
||||||
|
agg_test WHERE a = 3;
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
|
||||||
-- Tests for SQL/JSON: support the IS JSON predicate
|
-- Tests for SQL/JSON: support the IS JSON predicate
|
||||||
-- Relevant PG commit:
|
-- Relevant PG commit:
|
||||||
-- https://github.com/postgres/postgres/commit/6ee30209
|
-- https://github.com/postgres/postgres/commit/6ee30209
|
||||||
|
@ -480,6 +565,32 @@ SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric
|
||||||
FROM dist_table
|
FROM dist_table
|
||||||
LEFT JOIN ref_table ON TRUE;
|
LEFT JOIN ref_table ON TRUE;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- PG16 added WITH ADMIN FALSE option to GRANT ROLE
|
||||||
|
-- WITH ADMIN FALSE is the default, make sure we propagate correctly in Citus
|
||||||
|
-- Relevant PG commit: https://github.com/postgres/postgres/commit/e3ce2de
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE ROLE role1;
|
||||||
|
CREATE ROLE role2;
|
||||||
|
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SET citus.grep_remote_commands = '%GRANT%';
|
||||||
|
-- default admin option is false
|
||||||
|
GRANT role1 TO role2;
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
-- should behave same as default
|
||||||
|
GRANT role1 TO role2 WITH ADMIN FALSE;
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
-- with admin option and with admin true are the same
|
||||||
|
GRANT role1 TO role2 WITH ADMIN OPTION;
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
GRANT role1 TO role2 WITH ADMIN TRUE;
|
||||||
|
REVOKE role1 FROM role2;
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
RESET citus.grep_remote_commands;
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP EXTENSION postgres_fdw CASCADE;
|
DROP EXTENSION postgres_fdw CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue