Merge pull request #3938 from citusdata/fix/extension-dist-tables

pull/4162/head
Marco Slot 2020-09-11 12:24:35 +02:00 committed by GitHub
commit 94736ce78d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 205 additions and 27 deletions

View File

@ -27,6 +27,7 @@
#include "catalog/pg_attribute.h"
#include "catalog/pg_enum.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#if PG_VERSION_NUM >= 12000
#include "catalog/pg_proc.h"
@ -156,7 +157,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
bool viaDeprecatedAPI = true;
ObjectAddress tableAddress = { 0 };
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
@ -683,6 +683,13 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
TupleDesc relationDesc = RelationGetDescr(relation);
char *relationName = RelationGetRelationName(relation);
if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute catalog tables")));
}
if (!RelationUsesHeapAccessMethodOrNone(relation))
{
ereport(ERROR, (errmsg(

View File

@ -631,6 +631,20 @@ SupportedDependencyByCitus(const ObjectAddress *address)
}
/*
* IsTableOwnedByExtension returns whether the table with the given relation ID is
* owned by an extension.
*/
bool
IsTableOwnedByExtension(Oid relationId)
{
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
return IsObjectAddressOwnedByExtension(&tableAddress, NULL);
}
/*
* IsObjectAddressOwnedByExtension returns whether or not the object is owned by an
* extension. It is assumed that an object having a dependency on an extension is created

View File

@ -36,6 +36,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_node.h"
@ -54,6 +55,7 @@
#include "utils/syscache.h"
static List * GetDistributedTableDDLEvents(Oid relationId);
static char * LocalGroupIdUpdateCommand(int32 groupId);
static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort,
int attrNum, bool value);
@ -384,6 +386,12 @@ MetadataCreateCommands(void)
Oid relationId = cacheEntry->relationId;
ObjectAddress tableAddress = { 0 };
if (IsTableOwnedByExtension(relationId))
{
/* skip table creation when the Citus table is owned by an extension */
continue;
}
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
@ -407,8 +415,16 @@ MetadataCreateCommands(void)
/* construct the foreign key constraints after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip foreign key creation when the Citus table is owned by an extension */
continue;
}
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(cacheEntry->relationId);
GetReferencingForeignConstaintCommands(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
foreignConstraintCommands);
@ -417,10 +433,18 @@ MetadataCreateCommands(void)
/* construct partitioning hierarchy after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList)
{
if (PartitionTable(cacheEntry->relationId))
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip partition creation when the Citus table is owned by an extension */
continue;
}
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(cacheEntry->relationId);
GenerateAlterTableAttachPartitionCommand(relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
alterTableAttachPartitionCommands);
@ -461,7 +485,7 @@ MetadataCreateCommands(void)
* sequences, setting the owner of the table, inserting table and shard metadata,
* setting the truncate trigger and foreign key constraints.
*/
List *
static List *
GetDistributedTableDDLEvents(Oid relationId)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
@ -469,17 +493,22 @@ GetDistributedTableDDLEvents(Oid relationId)
List *commandList = NIL;
bool includeSequenceDefaults = true;
/* commands to create sequences */
List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
commandList = list_concat(commandList, sequenceDDLCommands);
/* if the table is owned by an extension we only propagate pg_dist_* records */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
if (!tableOwnedByExtension)
{
/* commands to create sequences */
List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
commandList = list_concat(commandList, sequenceDDLCommands);
/* commands to create the table */
List *tableDDLCommands = GetTableDDLEvents(relationId, includeSequenceDefaults);
commandList = list_concat(commandList, tableDDLCommands);
/* commands to create the table */
List *tableDDLCommands = GetTableDDLEvents(relationId, includeSequenceDefaults);
commandList = list_concat(commandList, tableDDLCommands);
/* command to reset the table owner */
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
commandList = lappend(commandList, tableOwnerResetCommand);
/* command to reset the table owner */
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
commandList = lappend(commandList, tableOwnerResetCommand);
}
/* command to insert pg_dist_partition entry */
char *metadataCommand = DistributionCreateCommand(cacheEntry);
@ -494,17 +523,20 @@ GetDistributedTableDDLEvents(Oid relationId)
List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList);
/* commands to create foreign key constraints */
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
/* commands to create partitioning hierarchy */
if (PartitionTable(relationId))
if (!tableOwnedByExtension)
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
commandList = lappend(commandList, alterTableAttachPartitionCommands);
/* commands to create foreign key constraints */
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
/* commands to create partitioning hierarchy */
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
commandList = lappend(commandList, alterTableAttachPartitionCommands);
}
}
return commandList;

View File

@ -24,6 +24,7 @@
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata/distobject.h"
#include "foreign/foreign.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -100,9 +101,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
performMultipleDeletions(objects, DROP_RESTRICT,
PERFORM_DELETION_INTERNAL);
}
else
else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{
/* drop the table with cascade since other tables may be referring to it */
/*
* If the table is owned by an extension, we cannot drop it, nor should we
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
* table and only delete the metadata.
*
* We drop the table with cascade since other tables may be referring to it.
*/
performDeletion(&distributedTableObject, DROP_CASCADE,
PERFORM_DELETION_INTERNAL);
}

View File

@ -22,6 +22,7 @@ extern bool IsObjectDistributed(const ObjectAddress *address);
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern bool IsTableOwnedByExtension(Oid relationId);
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
ObjectAddress *extensionAddress);

View File

@ -32,7 +32,6 @@ extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
extern bool ClusterHasKnownMetadataWorkers(void);
extern bool ShouldSyncTableMetadata(Oid relationId);
extern List * MetadataCreateCommands(void);
extern List * GetDistributedTableDDLEvents(Oid relationId);
extern List * MetadataDropCommands(void);
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
extern char * DistributionDeleteCommand(const char *schemaName,

View File

@ -317,3 +317,8 @@ WHERE col1 = 132;
DROP TABLE data_load_test1, data_load_test2;
END;
-- distributing catalog tables is not supported
SELECT create_distributed_table('pg_class', 'relname');
ERROR: cannot distribute catalog tables
SELECT create_reference_table('pg_class');
ERROR: cannot distribute catalog tables

View File

@ -243,3 +243,65 @@ SELECT :worker_1_lastval = :worker_2_lastval;
-- the type of sequences can't be changed
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
ALTER TABLE mx_sequence ALTER value TYPE INT;
-- test distributed tables owned by extension
CREATE TABLE seg_test (x int);
INSERT INTO seg_test VALUES (42);
-- pretend this table belongs to an extension
CREATE EXTENSION seg;
ALTER EXTENSION seg ADD TABLE seg_test;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
\c - - - :worker_1_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
\c - - - :worker_2_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
\c - - - :master_port
-- sync table metadata, but skip CREATE TABLE
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
SET citus.replication_model TO streaming;
SELECT create_distributed_table('seg_test', 'x');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.seg_test$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
x
---------------------------------------------------------------------
42
(1 row)
\c - - - :master_port
-- test metadata sync in the presence of an extension-owned table
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
x
---------------------------------------------------------------------
42
(1 row)
\c - - - :master_port
-- also drops table on both worker and master
DROP EXTENSION seg CASCADE;

View File

@ -201,3 +201,7 @@ WHERE col1 = 132;
DROP TABLE data_load_test1, data_load_test2;
END;
-- distributing catalog tables is not supported
SELECT create_distributed_table('pg_class', 'relname');
SELECT create_reference_table('pg_class');

View File

@ -129,3 +129,50 @@ SELECT :worker_1_lastval = :worker_2_lastval;
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
ALTER TABLE mx_sequence ALTER value TYPE INT;
-- test distributed tables owned by extension
CREATE TABLE seg_test (x int);
INSERT INTO seg_test VALUES (42);
-- pretend this table belongs to an extension
CREATE EXTENSION seg;
ALTER EXTENSION seg ADD TABLE seg_test;
\c - - - :worker_1_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
\c - - - :worker_2_port
-- pretend the extension created the table on the worker as well
CREATE TABLE seg_test (x int);
ALTER EXTENSION seg ADD TABLE seg_test;
\c - - - :master_port
-- sync table metadata, but skip CREATE TABLE
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
SET citus.replication_model TO streaming;
SELECT create_distributed_table('seg_test', 'x');
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
\c - - - :master_port
-- test metadata sync in the presence of an extension-owned table
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
-- should be able to see contents from worker
SELECT * FROM seg_test;
\c - - - :master_port
-- also drops table on both worker and master
DROP EXTENSION seg CASCADE;