From 048fddf4da4589461a8c4a3f07a56b529837b188 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Thu, 15 Dec 2016 14:00:53 +0300 Subject: [PATCH] Propagate MX table and shard metadata on `create_distributed_table` call --- .../commands/create_distributed_table.c | 20 ++ .../distributed/metadata/metadata_sync.c | 47 +++++ src/include/distributed/metadata_sync.h | 1 + .../regress/expected/multi_metadata_sync.out | 182 +++++++++++++++++- src/test/regress/sql/multi_metadata_sync.sql | 89 ++++++++- 5 files changed, 336 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index f405350e2..8bc516242 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -38,9 +38,11 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" +#include "distributed/worker_transaction.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" @@ -173,6 +175,24 @@ create_distributed_table(PG_FUNCTION_ARGS) colocateWithTableName, ShardCount, ShardReplicationFactor); + if (ShouldSyncTableMetadata(relationId)) + { + List *commandList = GetDistributedTableDDLEvents(relationId); + ListCell *commandCell = NULL; + + /* disable DDL propagation on workers */ + SendCommandToWorkers(WORKERS_WITH_METADATA, + "SET citus.enable_ddl_propagation TO off"); + + /* send the commands one by one */ + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + SendCommandToWorkers(WORKERS_WITH_METADATA, command); + } + } + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 643b6102c..41d86517b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -271,6 +271,53 @@ MetadataCreateCommands(void) } +/* + * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to + * create the given distributed table on a worker. The list includes setting up any + * sequences, setting the owner of the table, inserting table and shard metadata, + * setting the truncate trigger and foreign key constraints. + */ +List * +GetDistributedTableDDLEvents(Oid relationId) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + + List *shardIntervalList = NIL; + List *commandList = NIL; + List *foreignConstraintCommands = NIL; + List *shardMetadataInsertCommandList = NIL; + char *tableOwnerResetCommand = NULL; + char *metadataCommand = NULL; + char *truncateTriggerCreateCommand = NULL; + + /* commands to create the table */ + commandList = GetTableDDLEvents(relationId); + + /* command to reset the table owner */ + tableOwnerResetCommand = TableOwnerResetCommand(relationId); + commandList = lappend(commandList, tableOwnerResetCommand); + + /* command to insert pg_dist_partition entry */ + metadataCommand = DistributionCreateCommand(cacheEntry); + commandList = lappend(commandList, metadataCommand); + + /* commands to create the truncate trigger of the mx table */ + truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); + commandList = lappend(commandList, truncateTriggerCreateCommand); + + /* commands to insert pg_dist_shard & pg_dist_shard_placement entries */ + shardIntervalList = LoadShardIntervalList(relationId); + shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); + commandList = list_concat(commandList, shardMetadataInsertCommandList); + + /* commands to create foreign key constraints */ + foreignConstraintCommands = GetTableForeignConstraintCommands(relationId); + commandList = list_concat(commandList, foreignConstraintCommands); + + return commandList; +} + + /* * MetadataDropCommands returns list of queries that are required to * drop all the metadata of the node that are related to clustered tables. diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 94efd573a..0aaafc8cb 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -20,6 +20,7 @@ /* Functions declarations for metadata syncing */ extern bool ShouldSyncTableMetadata(Oid relationId); extern List * MetadataCreateCommands(void); +extern List * GetDistributedTableDDLEvents(Oid relationId); extern List * MetadataDropCommands(void); extern char * DistributionCreateCommand(DistTableCacheEntry *cacheEntry); extern char * DistributionDeleteCommand(char *schemaName, diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 1793f0419..cd1ef4a6c 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -409,8 +409,184 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; f (1 row) +-- Test DDL propagation in MX tables +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SET citus.shard_count = 5; +CREATE SCHEMA mx_test_schema_1; +CREATE SCHEMA mx_test_schema_2; +-- Create MX tables +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); +CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); +CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text); +CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 (col2); +ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col1) REFERENCES mx_test_schema_1.mx_table_1(col1); +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_2" btree (col2) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +-- Check that created tables are marked as streaming replicated tables +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + logicalrelid | repmodel +-----------------------------+---------- + mx_test_schema_1.mx_table_1 | s + mx_test_schema_2.mx_table_2 | s +(2 rows) + +-- See the shards and placements of the mx tables +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + logicalrelid | shardid | nodename | nodeport +-----------------------------+---------+-----------+---------- + mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637 +(10 rows) + + +-- Check that metadata of MX tables exist on the metadata worker +\c - - - :worker_1_port +-- Check that tables are created +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_2" btree (col2) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that table metadata are created +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + logicalrelid | repmodel +-----------------------------+---------- + mx_test_schema_1.mx_table_1 | s + mx_test_schema_2.mx_table_2 | s +(2 rows) + +-- Check that shard and placement data are created +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + logicalrelid | shardid | nodename | nodeport +-----------------------------+---------+-----------+---------- + mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637 +(10 rows) + +-- Check that metadata of MX tables don't exist on the non-metadata worker +\c - - - :worker_2_port +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 +SELECT * FROM pg_dist_partition; + logicalrelid | partmethod | partkey | colocationid | repmodel +--------------+------------+---------+--------------+---------- +(0 rows) + +SELECT * FROM pg_dist_shard; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +--------------+---------+--------------+---------------+--------------- +(0 rows) + +SELECT * FROM pg_dist_shard_placement; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+----------+----------+------------- +(0 rows) + -- Cleanup \c - - - :worker_1_port +DROP TABLE mx_test_schema_2.mx_table_2; +DROP TABLE mx_test_schema_1.mx_table_1; DROP TABLE mx_testing_schema.mx_test_table; DELETE FROM pg_dist_node; DELETE FROM pg_dist_partition; @@ -430,5 +606,9 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); (1 row) -DROP TABLE mx_testing_schema.mx_test_table CASCADE; +DROP TABLE mx_test_schema_2.mx_table_2; +DROP TABLE mx_test_schema_1.mx_table_1; +DROP TABLE mx_testing_schema.mx_test_table; +RESET citus.shard_count; +RESET citus.shard_replication_factor; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index d33415351..386d3c2c2 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -139,18 +139,103 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; + +-- Test DDL propagation in MX tables +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SET citus.shard_count = 5; +CREATE SCHEMA mx_test_schema_1; +CREATE SCHEMA mx_test_schema_2; + +-- Create MX tables +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); +CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); + +CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text); +CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 (col2); +ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col1) REFERENCES mx_test_schema_1.mx_table_1(col1); + +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 + +SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1'); +SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1'); + +-- Check that created tables are marked as streaming replicated tables +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + +-- See the shards and placements of the mx tables +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + +-- Check that metadata of MX tables exist on the metadata worker +\c - - - :worker_1_port + +-- Check that tables are created +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 + +-- Check that table metadata are created +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + +-- Check that shard and placement data are created +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + +-- Check that metadata of MX tables don't exist on the non-metadata worker +\c - - - :worker_2_port + +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 + +SELECT * FROM pg_dist_partition; +SELECT * FROM pg_dist_shard; +SELECT * FROM pg_dist_shard_placement; + -- Cleanup \c - - - :worker_1_port +DROP TABLE mx_test_schema_2.mx_table_2; +DROP TABLE mx_test_schema_1.mx_table_1; DROP TABLE mx_testing_schema.mx_test_table; DELETE FROM pg_dist_node; DELETE FROM pg_dist_partition; DELETE FROM pg_dist_shard; DELETE FROM pg_dist_shard_placement; \d mx_testing_schema.mx_test_table - \c - - - :master_port SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); -DROP TABLE mx_testing_schema.mx_test_table CASCADE; +DROP TABLE mx_test_schema_2.mx_table_2; +DROP TABLE mx_test_schema_1.mx_table_1; +DROP TABLE mx_testing_schema.mx_test_table; + +RESET citus.shard_count; +RESET citus.shard_replication_factor; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;