Fix distributing tables owned by extensions

pull/3938/head
Marco Slot 2020-04-27 11:46:36 +02:00
parent 9a56c22917
commit bd12555b16
8 changed files with 196 additions and 27 deletions

View File

@ -27,6 +27,7 @@
#include "catalog/pg_attribute.h" #include "catalog/pg_attribute.h"
#include "catalog/pg_enum.h" #include "catalog/pg_enum.h"
#include "catalog/pg_extension.h" #include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h" #include "catalog/pg_opclass.h"
#if PG_VERSION_NUM >= 12000 #if PG_VERSION_NUM >= 12000
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
@ -156,7 +157,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
bool viaDeprecatedAPI = true; bool viaDeprecatedAPI = true;
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
/* /*
* distributed tables might have dependencies on different objects, since we create * distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created * shards for a distributed table via multiple sessions these objects will be created
@ -683,6 +683,13 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
TupleDesc relationDesc = RelationGetDescr(relation); TupleDesc relationDesc = RelationGetDescr(relation);
char *relationName = RelationGetRelationName(relation); char *relationName = RelationGetRelationName(relation);
if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute catalog tables")));
}
if (!RelationUsesHeapAccessMethodOrNone(relation)) if (!RelationUsesHeapAccessMethodOrNone(relation))
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(

View File

@ -631,6 +631,20 @@ SupportedDependencyByCitus(const ObjectAddress *address)
} }
/*
* IsTableOwnedByExtension returns whether the table with the given relation ID is
* owned by an extension.
*/
bool
IsTableOwnedByExtension(Oid relationId)
{
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
return IsObjectAddressOwnedByExtension(&tableAddress, NULL);
}
/* /*
* IsObjectAddressOwnedByExtension returns whether or not the object is owned by an * IsObjectAddressOwnedByExtension returns whether or not the object is owned by an
* extension. It is assumed that an object having a dependency on an extension is created * extension. It is assumed that an object having a dependency on an extension is created

View File

@ -36,6 +36,7 @@
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
@ -54,6 +55,7 @@
#include "utils/syscache.h" #include "utils/syscache.h"
static List * GetDistributedTableDDLEvents(Oid relationId);
static char * LocalGroupIdUpdateCommand(int32 groupId); static char * LocalGroupIdUpdateCommand(int32 groupId);
static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort,
int attrNum, bool value); int attrNum, bool value);
@ -384,6 +386,12 @@ MetadataCreateCommands(void)
Oid relationId = cacheEntry->relationId; Oid relationId = cacheEntry->relationId;
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
if (IsTableOwnedByExtension(relationId))
{
/* skip table creation when the Citus table is owned by an extension */
continue;
}
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
@ -407,8 +415,16 @@ MetadataCreateCommands(void)
/* construct the foreign key constraints after all tables are created */ /* construct the foreign key constraints after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList) foreach_ptr(cacheEntry, propagatedTableList)
{ {
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip foreign key creation when the Citus table is owned by an extension */
continue;
}
List *foreignConstraintCommands = List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(cacheEntry->relationId); GetReferencingForeignConstaintCommands(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
foreignConstraintCommands); foreignConstraintCommands);
@ -417,10 +433,18 @@ MetadataCreateCommands(void)
/* construct partitioning hierarchy after all tables are created */ /* construct partitioning hierarchy after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList) foreach_ptr(cacheEntry, propagatedTableList)
{ {
if (PartitionTable(cacheEntry->relationId)) Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip partition creation when the Citus table is owned by an extension */
continue;
}
if (PartitionTable(relationId))
{ {
char *alterTableAttachPartitionCommands = char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(cacheEntry->relationId); GenerateAlterTableAttachPartitionCommand(relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
alterTableAttachPartitionCommands); alterTableAttachPartitionCommands);
@ -461,7 +485,7 @@ MetadataCreateCommands(void)
* sequences, setting the owner of the table, inserting table and shard metadata, * sequences, setting the owner of the table, inserting table and shard metadata,
* setting the truncate trigger and foreign key constraints. * setting the truncate trigger and foreign key constraints.
*/ */
List * static List *
GetDistributedTableDDLEvents(Oid relationId) GetDistributedTableDDLEvents(Oid relationId)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
@ -469,6 +493,10 @@ GetDistributedTableDDLEvents(Oid relationId)
List *commandList = NIL; List *commandList = NIL;
bool includeSequenceDefaults = true; bool includeSequenceDefaults = true;
/* if the table is owned by an extension we only propagate pg_dist_* records */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
if (!tableOwnedByExtension)
{
/* commands to create sequences */ /* commands to create sequences */
List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId); List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
commandList = list_concat(commandList, sequenceDDLCommands); commandList = list_concat(commandList, sequenceDDLCommands);
@ -480,6 +508,7 @@ GetDistributedTableDDLEvents(Oid relationId)
/* command to reset the table owner */ /* command to reset the table owner */
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
commandList = lappend(commandList, tableOwnerResetCommand); commandList = lappend(commandList, tableOwnerResetCommand);
}
/* command to insert pg_dist_partition entry */ /* command to insert pg_dist_partition entry */
char *metadataCommand = DistributionCreateCommand(cacheEntry); char *metadataCommand = DistributionCreateCommand(cacheEntry);
@ -494,6 +523,8 @@ GetDistributedTableDDLEvents(Oid relationId)
List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList); commandList = list_concat(commandList, shardMetadataInsertCommandList);
if (!tableOwnedByExtension)
{
/* commands to create foreign key constraints */ /* commands to create foreign key constraints */
List *foreignConstraintCommands = List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId); GetReferencingForeignConstaintCommands(relationId);
@ -506,6 +537,7 @@ GetDistributedTableDDLEvents(Oid relationId)
GenerateAlterTableAttachPartitionCommand(relationId); GenerateAlterTableAttachPartitionCommand(relationId);
commandList = lappend(commandList, alterTableAttachPartitionCommands); commandList = lappend(commandList, alterTableAttachPartitionCommands);
} }
}
return commandList; return commandList;
} }

View File

@ -24,6 +24,7 @@
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata/distobject.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -100,9 +101,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
performMultipleDeletions(objects, DROP_RESTRICT, performMultipleDeletions(objects, DROP_RESTRICT,
PERFORM_DELETION_INTERNAL); PERFORM_DELETION_INTERNAL);
} }
else else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{ {
/* drop the table with cascade since other tables may be referring to it */ /*
* If the table is owned by an extension, we cannot drop it, nor should we
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
* table and only delete the metadata.
*
* We drop the table with cascade since other tables may be referring to it.
*/
performDeletion(&distributedTableObject, DROP_CASCADE, performDeletion(&distributedTableObject, DROP_CASCADE,
PERFORM_DELETION_INTERNAL); PERFORM_DELETION_INTERNAL);
} }

View File

@ -22,6 +22,7 @@ extern bool IsObjectDistributed(const ObjectAddress *address);
extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern bool IsTableOwnedByExtension(Oid relationId);
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
ObjectAddress *extensionAddress); ObjectAddress *extensionAddress);

View File

@ -32,7 +32,6 @@ extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern List * MetadataCreateCommands(void); extern List * MetadataCreateCommands(void);
extern List * GetDistributedTableDDLEvents(Oid relationId);
extern List * MetadataDropCommands(void); extern List * MetadataDropCommands(void);
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry); extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
extern char * DistributionDeleteCommand(const char *schemaName, extern char * DistributionDeleteCommand(const char *schemaName,

View File

@ -243,3 +243,65 @@ SELECT :worker_1_lastval = :worker_2_lastval;
-- the type of sequences can't be changed -- the type of sequences can't be changed
ALTER TABLE mx_sequence ALTER value TYPE BIGINT; ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
ALTER TABLE mx_sequence ALTER value TYPE INT; ALTER TABLE mx_sequence ALTER value TYPE INT;
-- test distributed tables owned by extension
CREATE TABLE seg_test (x int);
INSERT INTO seg_test VALUES (42);
-- pretend this table belongs to an extension
CREATE EXTENSION seg;
ALTER EXTENSION seg ADD TABLE seg_test;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
\c - - - :worker_1_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
\c - - - :worker_2_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
\c - - - :master_port
-- sync table metadata, but skip CREATE TABLE
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
SET citus.replication_model TO streaming;
SELECT create_distributed_table('seg_test', 'x');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.seg_test$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
x
---------------------------------------------------------------------
42
(1 row)
\c - - - :master_port
-- test metadata sync in the presence of an extension-owned table
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
x
---------------------------------------------------------------------
42
(1 row)
\c - - - :master_port
-- also drops table on both worker and master
DROP EXTENSION seg CASCADE;

View File

@ -129,3 +129,50 @@ SELECT :worker_1_lastval = :worker_2_lastval;
ALTER TABLE mx_sequence ALTER value TYPE BIGINT; ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
ALTER TABLE mx_sequence ALTER value TYPE INT; ALTER TABLE mx_sequence ALTER value TYPE INT;
-- test distributed tables owned by extension
CREATE TABLE seg_test (x int);
INSERT INTO seg_test VALUES (42);
-- pretend this table belongs to an extension
CREATE EXTENSION seg;
ALTER EXTENSION seg ADD TABLE seg_test;
\c - - - :worker_1_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
\c - - - :worker_2_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
\c - - - :master_port
-- sync table metadata, but skip CREATE TABLE
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
SET citus.replication_model TO streaming;
SELECT create_distributed_table('seg_test', 'x');
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
\c - - - :master_port
-- test metadata sync in the presence of an extension-owned table
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
\c - - - :master_port
-- also drops table on both worker and master
DROP EXTENSION seg CASCADE;