Handle dropping the partitioned tables properly

Before this commit, we might be leaving some metadata on the workers.
Now, we handle DROP SCHEMA .. CASCADE properly to avoid any metadata
leakage.
pull/5762/head
Onder Kalaci 2022-03-04 10:56:07 +01:00
parent 3801576dfb
commit 24fcd2a88c
6 changed files with 245 additions and 24 deletions

View File

@ -11,11 +11,13 @@
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -123,6 +125,10 @@ master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS)
* The function is a no-op for non-distributed tables and clusters that don't * The function is a no-op for non-distributed tables and clusters that don't
* have any workers with metadata. Also, the function errors out if called * have any workers with metadata. Also, the function errors out if called
* from a worker node. * from a worker node.
*
* This function assumed that it is called via a trigger. But we cannot do the
* typical CALLED_AS_TRIGGER check because this is called via another trigger,
* which CALLED_AS_TRIGGER does not cover.
*/ */
static void static void
MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName, MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName,
@ -146,6 +152,16 @@ MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName
return; return;
} }
if (PartitionTable(relationId))
{
/*
* MasterRemoveDistributedTableMetadataFromWorkers is only called from drop trigger.
* When parent is dropped in a drop trigger, we remove all the corresponding
* partitions via the parent, mostly for performance reasons.
*/
return;
}
/* drop the distributed table metadata on the workers */ /* drop the distributed table metadata on the workers */
char *deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName); char *deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName);
SendCommandToWorkersWithMetadata(deleteDistributionCommand); SendCommandToWorkersWithMetadata(deleteDistributionCommand);

View File

@ -29,6 +29,7 @@
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/multi_partitioning_utils.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -37,12 +38,13 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
PG_FUNCTION_INFO_V1(worker_drop_shell_table); PG_FUNCTION_INFO_V1(worker_drop_shell_table);
PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency); PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency);
static void WorkerDropDistributedTable(Oid relationId);
#if PG_VERSION_NUM < PG_VERSION_13 #if PG_VERSION_NUM < PG_VERSION_13
static long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, static long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype,
Oid refclassId, Oid refobjectId); Oid refclassId, Oid refobjectId);
#endif #endif
/* /*
* worker_drop_distributed_table drops the distributed table with the given oid, * worker_drop_distributed_table drops the distributed table with the given oid,
* then, removes the associated rows from pg_dist_partition, pg_dist_shard and * then, removes the associated rows from pg_dist_partition, pg_dist_shard and
@ -64,8 +66,6 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
text *relationName = PG_GETARG_TEXT_P(0); text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, true); Oid relationId = ResolveRelationId(relationName, true);
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
if (!OidIsValid(relationId)) if (!OidIsValid(relationId))
{ {
ereport(NOTICE, (errmsg("relation %s does not exist, skipping", ereport(NOTICE, (errmsg("relation %s does not exist, skipping",
@ -75,8 +75,45 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
EnsureTableOwner(relationId); EnsureTableOwner(relationId);
List *shardList = LoadShardList(relationId); if (PartitionedTable(relationId))
{
/*
* When "DROP SCHEMA .. CASCADE" happens, we rely on Postgres' drop trigger
* to send the individual DROP TABLE commands for tables.
*
* In case of partitioned tables, we have no control on the order of DROP
* commands that is sent to the extension. We can try to sort while processing
* on the coordinator, but we prefer to handle it in a more flexible manner.
*
* That's why, whenever we see a partitioned table, we drop all the corresponding
* partitions first. Otherwise, WorkerDropDistributedTable() would already drop
* the shell tables of the partitions (e.g., due to performDeletion(..CASCADE),
* and further WorkerDropDistributedTable() on the partitions would become no-op.
*
* If, say one partition has already been dropped earlier, that should also be fine
* because we read the existing partitions.
*/
List *partitionList = PartitionList(relationId);
Oid partitionOid = InvalidOid;
foreach_oid(partitionOid, partitionList)
{
WorkerDropDistributedTable(partitionOid);
}
}
WorkerDropDistributedTable(relationId);
PG_RETURN_VOID();
}
/*
* WorkerDropDistributedTable is a helper function for worker_drop_distributed_table, see
* tha function for the details.
*/
static void
WorkerDropDistributedTable(Oid relationId)
{
/* first check the relation type */ /* first check the relation type */
Relation distributedRelation = relation_open(relationId, AccessShareLock); Relation distributedRelation = relation_open(relationId, AccessShareLock);
@ -86,9 +123,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
relation_close(distributedRelation, AccessShareLock); relation_close(distributedRelation, AccessShareLock);
/* prepare distributedTableObject for dropping the table */ /* prepare distributedTableObject for dropping the table */
distributedTableObject.classId = RelationRelationId; ObjectAddress distributedTableObject = { RelationRelationId, relationId, 0 };
distributedTableObject.objectId = relationId;
distributedTableObject.objectSubId = 0;
/* Drop dependent sequences from pg_dist_object */ /* Drop dependent sequences from pg_dist_object */
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
@ -121,6 +156,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
} }
/* iterate over shardList to delete the corresponding rows */ /* iterate over shardList to delete the corresponding rows */
List *shardList = LoadShardList(relationId);
uint64 *shardIdPointer = NULL; uint64 *shardIdPointer = NULL;
foreach_ptr(shardIdPointer, shardList) foreach_ptr(shardIdPointer, shardList)
{ {
@ -140,8 +176,6 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
/* delete the row from pg_dist_partition */ /* delete the row from pg_dist_partition */
DeletePartitionRow(relationId); DeletePartitionRow(relationId);
PG_RETURN_VOID();
} }

View File

@ -395,3 +395,117 @@ NOTICE: issuing ROLLBACK
DROP SCHEMA drop_partitioned_table CASCADE; DROP SCHEMA drop_partitioned_table CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 3 other objects
SET search_path TO public; SET search_path TO public;
-- dropping the schema should drop the metadata on the workers
CREATE SCHEMA partitioning_schema;
SET search_path TO partitioning_schema;
CREATE TABLE part_table (
col timestamp
) PARTITION BY RANGE (col);
CREATE TABLE part_table_1
PARTITION OF part_table
FOR VALUES FROM ('2010-01-01') TO ('2015-01-01');
SELECT create_distributed_table('part_table', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- show we have pg_dist_partition entries on the workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,2)
(localhost,57638,t,2)
(2 rows)
-- show we have pg_dist_object entries on the workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,2)
(localhost,57638,t,2)
(2 rows)
DROP SCHEMA partitioning_schema CASCADE;
NOTICE: drop cascades to table part_table
-- show we don't have pg_dist_partition entries on the workers after dropping the schema
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- show we don't have pg_dist_object entries on the workers after dropping the schema
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- dropping the parent should drop the metadata on the workers
CREATE SCHEMA partitioning_schema;
SET search_path TO partitioning_schema;
CREATE TABLE part_table (
col timestamp
) PARTITION BY RANGE (col);
CREATE TABLE part_table_1
PARTITION OF part_table
FOR VALUES FROM ('2010-01-01') TO ('2015-01-01');
SELECT create_distributed_table('part_table', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE part_table;
-- show we don't have pg_dist_partition entries on the workers after dropping the parent
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- show we don't have pg_dist_object entries on the workers after dropping the parent
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
SET search_path TO partitioning_schema;
CREATE TABLE part_table (
col timestamp
) PARTITION BY RANGE (col);
CREATE TABLE part_table_1
PARTITION OF part_table
FOR VALUES FROM ('2010-01-01') TO ('2015-01-01');
SELECT create_distributed_table('part_table', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE part_table_1;
-- show we have pg_dist_partition entries for the parent on the workers after dropping the partition
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
-- show we have pg_dist_object entries for the parent on the workers after dropping the partition
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
-- clean-up
DROP SCHEMA partitioning_schema CASCADE;
NOTICE: drop cascades to table part_table

View File

@ -61,16 +61,6 @@ CREATE FUNCTION find_shard_interval_index(bigint)
RETURNS int RETURNS int
AS 'citus' AS 'citus'
LANGUAGE C STRICT; LANGUAGE C STRICT;
-- remove tables from pg_dist_partition, if they don't exist i.e not found in pg_class
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);
select 1 from run_command_on_workers($$
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
-- =================================================================== -- ===================================================================
-- test co-location util functions -- test co-location util functions
-- =================================================================== -- ===================================================================

View File

@ -240,3 +240,75 @@ ROLLBACK;
DROP SCHEMA drop_partitioned_table CASCADE; DROP SCHEMA drop_partitioned_table CASCADE;
SET search_path TO public; SET search_path TO public;
-- dropping the schema should drop the metadata on the workers
CREATE SCHEMA partitioning_schema;
SET search_path TO partitioning_schema;
CREATE TABLE part_table (
col timestamp
) PARTITION BY RANGE (col);
CREATE TABLE part_table_1
PARTITION OF part_table
FOR VALUES FROM ('2010-01-01') TO ('2015-01-01');
SELECT create_distributed_table('part_table', 'col');
-- show we have pg_dist_partition entries on the workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
-- show we have pg_dist_object entries on the workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
DROP SCHEMA partitioning_schema CASCADE;
-- show we don't have pg_dist_partition entries on the workers after dropping the schema
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
-- show we don't have pg_dist_object entries on the workers after dropping the schema
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
-- dropping the parent should drop the metadata on the workers
CREATE SCHEMA partitioning_schema;
SET search_path TO partitioning_schema;
CREATE TABLE part_table (
col timestamp
) PARTITION BY RANGE (col);
CREATE TABLE part_table_1
PARTITION OF part_table
FOR VALUES FROM ('2010-01-01') TO ('2015-01-01');
SELECT create_distributed_table('part_table', 'col');
DROP TABLE part_table;
-- show we don't have pg_dist_partition entries on the workers after dropping the parent
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
-- show we don't have pg_dist_object entries on the workers after dropping the parent
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
SET search_path TO partitioning_schema;
CREATE TABLE part_table (
col timestamp
) PARTITION BY RANGE (col);
CREATE TABLE part_table_1
PARTITION OF part_table
FOR VALUES FROM ('2010-01-01') TO ('2015-01-01');
SELECT create_distributed_table('part_table', 'col');
DROP TABLE part_table_1;
-- show we have pg_dist_partition entries for the parent on the workers after dropping the partition
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_partition where exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid AND relname ILIKE '%part_table%');$$);
-- show we have pg_dist_object entries for the parent on the workers after dropping the partition
SELECT run_command_on_workers($$SELECT count(*) FROM pg_dist_object as obj where classid = 1259 AND exists(select * from pg_class where pg_class.oid=obj.objid AND relname ILIKE '%part_table%');$$);
-- clean-up
DROP SCHEMA partitioning_schema CASCADE;

View File

@ -66,11 +66,6 @@ CREATE FUNCTION find_shard_interval_index(bigint)
AS 'citus' AS 'citus'
LANGUAGE C STRICT; LANGUAGE C STRICT;
-- remove tables from pg_dist_partition, if they don't exist i.e not found in pg_class
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);
select 1 from run_command_on_workers($$
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);$$);
-- =================================================================== -- ===================================================================
-- test co-location util functions -- test co-location util functions
-- =================================================================== -- ===================================================================