pull/6687/merge
Hanefi Onaldi 2025-06-24 10:25:15 -07:00 committed by GitHub
commit 7d23e70dd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 65 additions and 2 deletions

View File

@ -1308,6 +1308,39 @@ LookupNodeForGroup(int32 groupId)
} }
/*
* ShardPlacementListViaCatalog returns the list of placements for the given shard from
* the cache.
*
* This function is intended for DROP operations can potentially drop placements, and
* therefore invalidate caches for placements. We continue accessing caches for worker
* nodes, and expect that they will not get invalidated by a concurrent process.
*/
List *
ShardPlacementListViaCatalog(uint64 shardId)
{
List *groupShardPlacementList = BuildShardPlacementList(shardId);
List *shardPlacementList = NIL;
GroupShardPlacement *groupShardPlacement = NULL;
foreach_ptr(groupShardPlacement, groupShardPlacementList)
{
WorkerNode *worker = LookupNodeForGroup(groupShardPlacement->groupId);
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
placement->shardId = groupShardPlacement->shardId;
placement->shardLength = groupShardPlacement->shardLength;
placement->nodeId = worker->nodeId;
placement->nodeName = pstrdup(worker->workerName);
placement->nodePort = worker->workerPort;
placement->placementId = groupShardPlacement->placementId;
shardPlacementList = lappend(shardPlacementList, placement);
}
return SortList(shardPlacementList, CompareShardPlacements);
}
/* /*
* ShardPlacementList returns the list of placements for the given shard from * ShardPlacementList returns the list of placements for the given shard from
* the cache. * the cache.

View File

@ -1550,7 +1550,7 @@ ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId)
/* /*
* ActiveShardPlacementList finds shard placements for the given shardId from * ActiveShardPlacementList finds shard placements for the given shardId from
* system catalogs, chooses placements that are in active state, and returns * metadata cache, chooses placements that are in active state, and returns
* these shard placements in a new list. * these shard placements in a new list.
*/ */
List * List *

View File

@ -374,7 +374,7 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName,
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ShardPlacementList(shardId); task->taskPlacementList = ShardPlacementListViaCatalog(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }

View File

@ -185,6 +185,7 @@ extern int32 GetLocalNodeId(void);
extern void CitusTableCacheFlushInvalidatedEntries(void); extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok); extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId); extern List * ShardPlacementList(uint64 shardId);
extern List * ShardPlacementListViaCatalog(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern void InvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraph(void);

View File

@ -605,6 +605,22 @@ DETAIL: "type temp_type" will be created only locally
CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three'); CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three');
WARNING: "type temp_enum" has dependency on unsupported object "schema pg_temp_xxx" WARNING: "type temp_enum" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "type temp_enum" will be created only locally DETAIL: "type temp_enum" will be created only locally
-- check that dropping a schema that has a type used in a distribution column does not fail
CREATE SCHEMA schema_with_custom_distribution_type;
SET search_path TO schema_with_custom_distribution_type;
CREATE TYPE my_type AS (int_field int);
CREATE TABLE tbl (a schema_with_custom_distribution_type.my_type);
SELECT create_distributed_table('tbl', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
drop schema schema_with_custom_distribution_type cascade;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to type my_type
drop cascades to table tbl
SET search_path TO type_tests;
-- clear objects -- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE; DROP SCHEMA type_tests CASCADE;

View File

@ -361,6 +361,19 @@ SELECT create_distributed_table('table_text_local_def','id');
CREATE TYPE pg_temp.temp_type AS (int_field int); CREATE TYPE pg_temp.temp_type AS (int_field int);
CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three'); CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three');
-- check that dropping a schema that has a type used in a distribution column does not fail
CREATE SCHEMA schema_with_custom_distribution_type;
SET search_path TO schema_with_custom_distribution_type;
CREATE TYPE my_type AS (int_field int);
CREATE TABLE tbl (a schema_with_custom_distribution_type.my_type);
SELECT create_distributed_table('tbl', 'a');
drop schema schema_with_custom_distribution_type cascade;
SET search_path TO type_tests;
-- clear objects -- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE; DROP SCHEMA type_tests CASCADE;