Handle MX tables on workers during drop table commands

pull/1045/head
Eren Basak 2016-12-20 12:23:30 +03:00
parent bed2e353db
commit 31af40cc26
13 changed files with 299 additions and 92 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -111,6 +111,8 @@ $(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql
cat $^ > $@
$(EXTENSION)--6.1-8.sql: $(EXTENSION)--6.1-7.sql $(EXTENSION)--6.1-7--6.1-8.sql
cat $^ > $@
$(EXTENSION)--6.1-9.sql: $(EXTENSION)--6.1-8.sql $(EXTENSION)--6.1-8--6.1-9.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,89 @@
/* citus--6.1-8--6.1-9.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION master_drop_distributed_table_metadata(logicalrelid regclass,
schema_name text,
table_name text)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_drop_distributed_table_metadata$$;
COMMENT ON FUNCTION master_drop_distributed_table_metadata(logicalrelid regclass,
schema_name text,
table_name text)
IS 'delete metadata of the distributed table';
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
node_names text[] := '{}';
node_ports bigint[] := '{}';
node_name text;
node_port bigint;
table_colocation_id integer;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
-- Must accumulate set of affected nodes before deleting placements, as
-- master_drop_all_shards will erase their rows, making it impossible for
-- us to know where to drop sequences (which must be dropped after shards,
-- since they have default value expressions which depend on sequences).
SELECT array_agg(sp.nodename), array_agg(sp.nodeport)
INTO node_names, node_ports
FROM pg_event_trigger_dropped_objects() AS dobj,
pg_dist_shard AS s,
pg_dist_shard_placement AS sp
WHERE dobj.object_type IN ('table', 'foreign table')
AND dobj.objid = s.logicalrelid
AND s.shardid = sp.shardid;
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF v_obj.object_type NOT IN ('table', 'foreign table') THEN
CONTINUE;
END IF;
-- nothing to do if not a distributed table
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN
CONTINUE;
END IF;
-- get colocation group
SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
PERFORM master_drop_distributed_table_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- drop colocation group if all referencing tables are dropped
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN
DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id;
END IF;
END LOOP;
IF cardinality(sequence_names) = 0 THEN
RETURN;
END IF;
FOR node_name, node_port IN
SELECT DISTINCT name, port
FROM unnest(node_names, node_ports) AS nodes(name, port)
LOOP
PERFORM master_drop_sequences(sequence_names, node_name, node_port);
END LOOP;
END;
$cdbdt$;
COMMENT ON FUNCTION citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.1-8'
default_version = '6.1-9'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -0,0 +1,56 @@
/*-------------------------------------------------------------------------
*
* drop_distributed_table.c
* Routines related to dropping distributed relations from a trigger.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_drop_distributed_table_metadata);
/*
* master_drop_distributed_table_metadata removes the entry of the specified distributed
* table from pg_dist_partition and drops the table from the workers if needed.
*/
Datum
master_drop_distributed_table_metadata(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *tableNameText = PG_GETARG_TEXT_P(2);
bool shouldSyncMetadata = false;
char *schemaName = text_to_cstring(schemaNameText);
char *tableName = text_to_cstring(tableNameText);
CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName);
DeletePartitionRow(relationId);
shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
if (shouldSyncMetadata)
{
char *deleteDistributionCommand = NULL;
/* drop the distributed table metadata on the workers */
deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName);
SendCommandToWorkers(WORKERS_WITH_METADATA, deleteDistributionCommand);
}
PG_RETURN_VOID();
}

View File

@ -196,45 +196,16 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *relationNameText = PG_GETARG_TEXT_P(2);
char *schemaName = NULL;
char *relationName = NULL;
bool isTopLevel = true;
List *shardIntervalList = NIL;
int droppedShardCount = 0;
char *schemaName = text_to_cstring(schemaNameText);
char *relationName = text_to_cstring(relationNameText);
PreventTransactionChain(isTopLevel, "DROP distributed table");
relationName = get_rel_name(relationId);
if (relationName != NULL)
{
/* ensure proper values are used if the table exists */
Oid schemaId = get_rel_namespace(relationId);
schemaName = get_namespace_name(schemaId);
/*
* Only allow the owner to drop all shards, this is more akin to DDL
* than DELETE.
*/
EnsureTableOwner(relationId);
}
else
{
/* table has been dropped, rely on user-supplied values */
schemaName = text_to_cstring(schemaNameText);
relationName = text_to_cstring(relationNameText);
/*
* Verify that this only is run as superuser - that's how it's used in
* our drop event trigger, and we can't verify permissions for an
* already dropped relation.
*/
if (!superuser())
{
ereport(ERROR, (errmsg("cannot drop all shards of a dropped table as "
"non-superuser")));
}
}
CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName);
shardIntervalList = LoadShardIntervalList(relationId);
droppedShardCount = DropShards(relationId, schemaName, relationName,
@ -299,6 +270,35 @@ master_drop_sequences(PG_FUNCTION_ARGS)
}
/*
* CheckTableSchemaNameForDrop errors out if the current user does not
* have permission to undistribute the given relation, taking into
* account that it may be called from the drop trigger. If the table exists,
* the function rewrites the given table and schema name.
*/
void
CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, char **tableName)
{
char *tempTableName = get_rel_name(relationId);
if (tempTableName != NULL)
{
/* ensure proper values are used if the table exists */
Oid schemaId = get_rel_namespace(relationId);
(*schemaName) = get_namespace_name(schemaId);
(*tableName) = tempTableName;
EnsureTableOwner(relationId);
}
else if (!superuser())
{
/* table does not exist, must be called from drop trigger */
ereport(ERROR, (errmsg("cannot drop distributed table metadata as a "
"non-superuser")));
}
}
/*
* DropShards drops all given shards in a relation. The id, name and schema
* for the relation are explicitly provided, since this function may be

View File

@ -515,6 +515,49 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
}
/*
* DeletePartitionRow removes the row from pg_dist_partition where the logicalrelid
* field equals to distributedRelationId. Then, the function invalidates the
* metadata cache.
*/
void
DeletePartitionRow(Oid distributedRelationId)
{
Relation pgDistPartition = NULL;
HeapTuple heapTuple = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
scanDescriptor = systable_beginscan(pgDistPartition, InvalidOid, false, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for partition %d",
distributedRelationId)));
}
simple_heap_delete(pgDistPartition, &heapTuple->t_self);
systable_endscan(scanDescriptor);
/* invalidate the cache */
CitusInvalidateRelcacheByRelid(distributedRelationId);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistPartition, RowExclusiveLock);
}
/*
* DeleteShardRow opens the shard system catalog, finds the unique row that has
* the given shardId, and deletes this row.

View File

@ -29,9 +29,6 @@
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
static void DeletePartitionRow(Oid distributedRelationId);
/*
* worker_drop_distributed_table drops the distributed table with the given oid,
* then, removes the associated rows from pg_dist_partition, pg_dist_shard and
@ -133,46 +130,3 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* DeletePartitionRow removes the row from pg_dist_partition where the logicalrelid
* field equals to distributedRelationId. Then, the function invalidates the
* metadata cache.
*/
void
DeletePartitionRow(Oid distributedRelationId)
{
Relation pgDistPartition = NULL;
HeapTuple heapTuple = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
scanDescriptor = systable_beginscan(pgDistPartition, InvalidOid, false, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for partition %d",
distributedRelationId)));
}
simple_heap_delete(pgDistPartition, &heapTuple->t_self);
systable_endscan(scanDescriptor);
/* invalidate the cache */
CitusInvalidateRelcacheByRelid(distributedRelationId);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistPartition, RowExclusiveLock);
}

View File

@ -76,6 +76,7 @@ extern void DeleteShardRow(uint64 shardId);
extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32

View File

@ -114,6 +114,8 @@ extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
List *ddlCommandList, List *foreignConstraintCommadList);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,
char **tableName);
/* Function declarations for generating metadata for shard and placement creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);

View File

@ -66,6 +66,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6';
ALTER EXTENSION citus UPDATE TO '6.1-7';
ALTER EXTENSION citus UPDATE TO '6.1-8';
ALTER EXTENSION citus UPDATE TO '6.1-9';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -288,8 +288,6 @@ Table "mx_testing_schema_2.fk_test_2"
Foreign-key constraints:
"fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3)
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
\c - - - :master_port
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
@ -435,8 +433,6 @@ SELECT * FROM mx_query_test ORDER BY a;
6 | six | 36
(6 rows)
\c - - - :worker_1_port
DROP TABLE mx_query_test;
\c - - - :master_port
DROP TABLE mx_query_test;
-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false
@ -824,7 +820,47 @@ WHERE
(2 rows)
\c - - - :master_port
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
-- Check that DROP TABLE on MX tables works
DROP TABLE mx_colocation_test_1;
DROP TABLE mx_colocation_test_2;
\d mx_colocation_test_1
\d mx_colocation_test_2
\c - - - :worker_1_port
\d mx_colocation_test_1
\d mx_colocation_test_2
-- Check that dropped MX table can be recreated again
\c - - - :master_port
SET citus.shard_count TO 7;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_temp_drop_test (a int);
SELECT create_distributed_table('mx_temp_drop_test', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass;
logicalrelid | repmodel
-------------------+----------
mx_temp_drop_test | s
(1 row)
DROP TABLE mx_temp_drop_test;
CREATE TABLE mx_temp_drop_test (a int);
SELECT create_distributed_table('mx_temp_drop_test', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass;
logicalrelid | repmodel
-------------------+----------
mx_temp_drop_test | s
(1 row)
DROP TABLE mx_temp_drop_test;
-- Cleanup
\c - - - :worker_1_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
@ -856,4 +892,5 @@ DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;

View File

@ -66,6 +66,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6';
ALTER EXTENSION citus UPDATE TO '6.1-7';
ALTER EXTENSION citus UPDATE TO '6.1-8';
ALTER EXTENSION citus UPDATE TO '6.1-9';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)

View File

@ -103,13 +103,10 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
\d mx_testing_schema_2.fk_test_2
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
\c - - - :master_port
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
RESET citus.shard_replication_factor;
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
@ -157,8 +154,6 @@ UPDATE mx_query_test SET c = 25 WHERE a = 5;
\c - - - :master_port
SELECT * FROM mx_query_test ORDER BY a;
\c - - - :worker_1_port
DROP TABLE mx_query_test;
\c - - - :master_port
DROP TABLE mx_query_test;
@ -357,8 +352,33 @@ WHERE
OR logicalrelid = 'mx_colocation_test_2'::regclass;
\c - - - :master_port
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
-- Check that DROP TABLE on MX tables works
DROP TABLE mx_colocation_test_1;
DROP TABLE mx_colocation_test_2;
\d mx_colocation_test_1
\d mx_colocation_test_2
\c - - - :worker_1_port
\d mx_colocation_test_1
\d mx_colocation_test_2
-- Check that dropped MX table can be recreated again
\c - - - :master_port
SET citus.shard_count TO 7;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_temp_drop_test (a int);
SELECT create_distributed_table('mx_temp_drop_test', 'a');
SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass;
DROP TABLE mx_temp_drop_test;
CREATE TABLE mx_temp_drop_test (a int);
SELECT create_distributed_table('mx_temp_drop_test', 'a');
SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass;
DROP TABLE mx_temp_drop_test;
-- Cleanup
\c - - - :worker_1_port
@ -381,4 +401,5 @@ RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;