From bd12555b169d9e9d019220a35604f4783b9702de Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 27 Apr 2020 11:46:36 +0200 Subject: [PATCH] Fix distributing tables owned by extensions --- .../commands/create_distributed_table.c | 9 ++- src/backend/distributed/metadata/dependency.c | 14 ++++ .../distributed/metadata/metadata_sync.c | 78 +++++++++++++------ .../distributed/worker/worker_drop_protocol.c | 11 ++- src/include/distributed/metadata/distobject.h | 1 + src/include/distributed/metadata_sync.h | 1 - src/test/regress/expected/multi_mx_ddl.out | 62 +++++++++++++++ src/test/regress/sql/multi_mx_ddl.sql | 47 +++++++++++ 8 files changed, 196 insertions(+), 27 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5f18e8b96..45c3be8e9 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -27,6 +27,7 @@ #include "catalog/pg_attribute.h" #include "catalog/pg_enum.h" #include "catalog/pg_extension.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #if PG_VERSION_NUM >= 12000 #include "catalog/pg_proc.h" @@ -156,7 +157,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS) bool viaDeprecatedAPI = true; ObjectAddress tableAddress = { 0 }; - /* * distributed tables might have dependencies on different objects, since we create * shards for a distributed table via multiple sessions these objects will be created @@ -683,6 +683,13 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, TupleDesc relationDesc = RelationGetDescr(relation); char *relationName = RelationGetRelationName(relation); + if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute catalog tables"))); + } + + if (!RelationUsesHeapAccessMethodOrNone(relation)) { ereport(ERROR, (errmsg( diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index d7bcb727d..8d97df9ac 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -631,6 +631,20 @@ SupportedDependencyByCitus(const ObjectAddress *address) } +/* + * IsTableOwnedByExtension returns whether the table with the given relation ID is + * owned by an extension. + */ +bool +IsTableOwnedByExtension(Oid relationId) +{ + ObjectAddress tableAddress = { 0 }; + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + + return IsObjectAddressOwnedByExtension(&tableAddress, NULL); +} + + /* * IsObjectAddressOwnedByExtension returns whether or not the object is owned by an * extension. It is assumed that an object having a dependency on an extension is created diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 2f10a620b..163cea3cc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -36,6 +36,7 @@ #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_node.h" @@ -54,6 +55,7 @@ #include "utils/syscache.h" +static List * GetDistributedTableDDLEvents(Oid relationId); static char * LocalGroupIdUpdateCommand(int32 groupId); static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, int attrNum, bool value); @@ -384,6 +386,12 @@ MetadataCreateCommands(void) Oid relationId = cacheEntry->relationId; ObjectAddress tableAddress = { 0 }; + if (IsTableOwnedByExtension(relationId)) + { + /* skip table creation when the Citus table is owned by an extension */ + continue; + } + List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); @@ -407,8 +415,16 @@ MetadataCreateCommands(void) /* construct the foreign key constraints after all tables are created */ foreach_ptr(cacheEntry, propagatedTableList) { + Oid relationId = cacheEntry->relationId; + + if (IsTableOwnedByExtension(relationId)) + { + /* skip foreign key creation when the Citus table is owned by an extension */ + continue; + } + List *foreignConstraintCommands = - GetReferencingForeignConstaintCommands(cacheEntry->relationId); + GetReferencingForeignConstaintCommands(relationId); metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, foreignConstraintCommands); @@ -417,10 +433,18 @@ MetadataCreateCommands(void) /* construct partitioning hierarchy after all tables are created */ foreach_ptr(cacheEntry, propagatedTableList) { - if (PartitionTable(cacheEntry->relationId)) + Oid relationId = cacheEntry->relationId; + + if (IsTableOwnedByExtension(relationId)) + { + /* skip partition creation when the Citus table is owned by an extension */ + continue; + } + + if (PartitionTable(relationId)) { char *alterTableAttachPartitionCommands = - GenerateAlterTableAttachPartitionCommand(cacheEntry->relationId); + GenerateAlterTableAttachPartitionCommand(relationId); metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, alterTableAttachPartitionCommands); @@ -461,7 +485,7 @@ MetadataCreateCommands(void) * sequences, setting the owner of the table, inserting table and shard metadata, * setting the truncate trigger and foreign key constraints. */ -List * +static List * GetDistributedTableDDLEvents(Oid relationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); @@ -469,17 +493,22 @@ GetDistributedTableDDLEvents(Oid relationId) List *commandList = NIL; bool includeSequenceDefaults = true; - /* commands to create sequences */ - List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId); - commandList = list_concat(commandList, sequenceDDLCommands); + /* if the table is owned by an extension we only propagate pg_dist_* records */ + bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); + if (!tableOwnedByExtension) + { + /* commands to create sequences */ + List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId); + commandList = list_concat(commandList, sequenceDDLCommands); - /* commands to create the table */ - List *tableDDLCommands = GetTableDDLEvents(relationId, includeSequenceDefaults); - commandList = list_concat(commandList, tableDDLCommands); + /* commands to create the table */ + List *tableDDLCommands = GetTableDDLEvents(relationId, includeSequenceDefaults); + commandList = list_concat(commandList, tableDDLCommands); - /* command to reset the table owner */ - char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); - commandList = lappend(commandList, tableOwnerResetCommand); + /* command to reset the table owner */ + char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); + commandList = lappend(commandList, tableOwnerResetCommand); + } /* command to insert pg_dist_partition entry */ char *metadataCommand = DistributionCreateCommand(cacheEntry); @@ -494,17 +523,20 @@ GetDistributedTableDDLEvents(Oid relationId) List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); commandList = list_concat(commandList, shardMetadataInsertCommandList); - /* commands to create foreign key constraints */ - List *foreignConstraintCommands = - GetReferencingForeignConstaintCommands(relationId); - commandList = list_concat(commandList, foreignConstraintCommands); - - /* commands to create partitioning hierarchy */ - if (PartitionTable(relationId)) + if (!tableOwnedByExtension) { - char *alterTableAttachPartitionCommands = - GenerateAlterTableAttachPartitionCommand(relationId); - commandList = lappend(commandList, alterTableAttachPartitionCommands); + /* commands to create foreign key constraints */ + List *foreignConstraintCommands = + GetReferencingForeignConstaintCommands(relationId); + commandList = list_concat(commandList, foreignConstraintCommands); + + /* commands to create partitioning hierarchy */ + if (PartitionTable(relationId)) + { + char *alterTableAttachPartitionCommands = + GenerateAlterTableAttachPartitionCommand(relationId); + commandList = lappend(commandList, alterTableAttachPartitionCommands); + } } return commandList; diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 925902bca..682a2d95c 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -24,6 +24,7 @@ #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata/distobject.h" #include "foreign/foreign.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -100,9 +101,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) performMultipleDeletions(objects, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); } - else + else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL)) { - /* drop the table with cascade since other tables may be referring to it */ + /* + * If the table is owned by an extension, we cannot drop it, nor should we + * until the user runs DROP EXTENSION. Therefore, we skip dropping the + * table and only delete the metadata. + * + * We drop the table with cascade since other tables may be referring to it. + */ performDeletion(&distributedTableObject, DROP_CASCADE, PERFORM_DELETION_INTERNAL); } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index cf2637064..6d857ef3f 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -22,6 +22,7 @@ extern bool IsObjectDistributed(const ObjectAddress *address); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); +extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, ObjectAddress *extensionAddress); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 5cf9a9538..78391ebbf 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -32,7 +32,6 @@ extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort); extern bool ClusterHasKnownMetadataWorkers(void); extern bool ShouldSyncTableMetadata(Oid relationId); extern List * MetadataCreateCommands(void); -extern List * GetDistributedTableDDLEvents(Oid relationId); extern List * MetadataDropCommands(void); extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry); extern char * DistributionDeleteCommand(const char *schemaName, diff --git a/src/test/regress/expected/multi_mx_ddl.out b/src/test/regress/expected/multi_mx_ddl.out index 6deaa6dd0..a7b203df3 100644 --- a/src/test/regress/expected/multi_mx_ddl.out +++ b/src/test/regress/expected/multi_mx_ddl.out @@ -243,3 +243,65 @@ SELECT :worker_1_lastval = :worker_2_lastval; -- the type of sequences can't be changed ALTER TABLE mx_sequence ALTER value TYPE BIGINT; ALTER TABLE mx_sequence ALTER value TYPE INT; +-- test distributed tables owned by extension +CREATE TABLE seg_test (x int); +INSERT INTO seg_test VALUES (42); +-- pretend this table belongs to an extension +CREATE EXTENSION seg; +ALTER EXTENSION seg ADD TABLE seg_test; +NOTICE: Citus does not propagate adding/dropping member objects +HINT: You can add/drop the member objects on the workers as well. +\c - - - :worker_1_port +-- pretend the extension created the table on the worker as well +CREATE TABLE seg_test (x int); +ALTER EXTENSION seg ADD TABLE seg_test; +NOTICE: Citus does not propagate adding/dropping member objects +HINT: You can add/drop the member objects on the workers as well. +\c - - - :worker_2_port +-- pretend the extension created the table on the worker as well +CREATE TABLE seg_test (x int); +ALTER EXTENSION seg ADD TABLE seg_test; +NOTICE: Citus does not propagate adding/dropping member objects +HINT: You can add/drop the member objects on the workers as well. +\c - - - :master_port +-- sync table metadata, but skip CREATE TABLE +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +SET citus.replication_model TO streaming; +SELECT create_distributed_table('seg_test', 'x'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.seg_test$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +-- should be able to see contents from worker +SELECT * FROM seg_test; + x +--------------------------------------------------------------------- + 42 +(1 row) + +\c - - - :master_port +-- test metadata sync in the presence of an extension-owned table +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +-- should be able to see contents from worker +SELECT * FROM seg_test; + x +--------------------------------------------------------------------- + 42 +(1 row) + +\c - - - :master_port +-- also drops table on both worker and master +DROP EXTENSION seg CASCADE; diff --git a/src/test/regress/sql/multi_mx_ddl.sql b/src/test/regress/sql/multi_mx_ddl.sql index 956238995..0d3e26e88 100644 --- a/src/test/regress/sql/multi_mx_ddl.sql +++ b/src/test/regress/sql/multi_mx_ddl.sql @@ -129,3 +129,50 @@ SELECT :worker_1_lastval = :worker_2_lastval; ALTER TABLE mx_sequence ALTER value TYPE BIGINT; ALTER TABLE mx_sequence ALTER value TYPE INT; +-- test distributed tables owned by extension +CREATE TABLE seg_test (x int); +INSERT INTO seg_test VALUES (42); + +-- pretend this table belongs to an extension +CREATE EXTENSION seg; +ALTER EXTENSION seg ADD TABLE seg_test; + +\c - - - :worker_1_port + +-- pretend the extension created the table on the worker as well +CREATE TABLE seg_test (x int); +ALTER EXTENSION seg ADD TABLE seg_test; + +\c - - - :worker_2_port + +-- pretend the extension created the table on the worker as well +CREATE TABLE seg_test (x int); +ALTER EXTENSION seg ADD TABLE seg_test; + +\c - - - :master_port + +-- sync table metadata, but skip CREATE TABLE +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +SET citus.replication_model TO streaming; +SELECT create_distributed_table('seg_test', 'x'); + +\c - - - :worker_1_port + +-- should be able to see contents from worker +SELECT * FROM seg_test; + +\c - - - :master_port + +-- test metadata sync in the presence of an extension-owned table +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +\c - - - :worker_1_port + +-- should be able to see contents from worker +SELECT * FROM seg_test; + +\c - - - :master_port + +-- also drops table on both worker and master +DROP EXTENSION seg CASCADE;