Propagate MX table and shard metadata on `create_distributed_table` call

pull/1045/head
Eren Basak 2016-12-15 14:00:53 +03:00
parent efcb1f9dd9
commit 048fddf4da
5 changed files with 336 additions and 3 deletions

View File

@ -38,9 +38,11 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/worker_transaction.h"
#include "executor/spi.h" #include "executor/spi.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
@ -173,6 +175,24 @@ create_distributed_table(PG_FUNCTION_ARGS)
colocateWithTableName, ShardCount, colocateWithTableName, ShardCount,
ShardReplicationFactor); ShardReplicationFactor);
if (ShouldSyncTableMetadata(relationId))
{
List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL;
/* disable DDL propagation on workers */
SendCommandToWorkers(WORKERS_WITH_METADATA,
"SET citus.enable_ddl_propagation TO off");
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -271,6 +271,53 @@ MetadataCreateCommands(void)
} }
/*
* GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
* create the given distributed table on a worker. The list includes setting up any
* sequences, setting the owner of the table, inserting table and shard metadata,
* setting the truncate trigger and foreign key constraints.
*/
List *
GetDistributedTableDDLEvents(Oid relationId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *shardIntervalList = NIL;
List *commandList = NIL;
List *foreignConstraintCommands = NIL;
List *shardMetadataInsertCommandList = NIL;
char *tableOwnerResetCommand = NULL;
char *metadataCommand = NULL;
char *truncateTriggerCreateCommand = NULL;
/* commands to create the table */
commandList = GetTableDDLEvents(relationId);
/* command to reset the table owner */
tableOwnerResetCommand = TableOwnerResetCommand(relationId);
commandList = lappend(commandList, tableOwnerResetCommand);
/* command to insert pg_dist_partition entry */
metadataCommand = DistributionCreateCommand(cacheEntry);
commandList = lappend(commandList, metadataCommand);
/* commands to create the truncate trigger of the mx table */
truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
/* commands to insert pg_dist_shard & pg_dist_shard_placement entries */
shardIntervalList = LoadShardIntervalList(relationId);
shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList);
/* commands to create foreign key constraints */
foreignConstraintCommands = GetTableForeignConstraintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
return commandList;
}
/* /*
* MetadataDropCommands returns list of queries that are required to * MetadataDropCommands returns list of queries that are required to
* drop all the metadata of the node that are related to clustered tables. * drop all the metadata of the node that are related to clustered tables.

View File

@ -20,6 +20,7 @@
/* Functions declarations for metadata syncing */ /* Functions declarations for metadata syncing */
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern List * MetadataCreateCommands(void); extern List * MetadataCreateCommands(void);
extern List * GetDistributedTableDDLEvents(Oid relationId);
extern List * MetadataDropCommands(void); extern List * MetadataDropCommands(void);
extern char * DistributionCreateCommand(DistTableCacheEntry *cacheEntry); extern char * DistributionCreateCommand(DistTableCacheEntry *cacheEntry);
extern char * DistributionDeleteCommand(char *schemaName, extern char * DistributionDeleteCommand(char *schemaName,

View File

@ -409,8 +409,184 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
f f
(1 row) (1 row)
-- Test DDL propagation in MX tables
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
SET citus.shard_count = 5;
CREATE SCHEMA mx_test_schema_1;
CREATE SCHEMA mx_test_schema_2;
-- Create MX tables
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text);
CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1);
CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text);
CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 (col2);
ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col1) REFERENCES mx_test_schema_1.mx_table_1(col1);
\d mx_test_schema_1.mx_table_1
Table "mx_test_schema_1.mx_table_1"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
Indexes:
"mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1)
"mx_index_1" btree (col1)
Referenced by:
TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
\d mx_test_schema_2.mx_table_2
Table "mx_test_schema_2.mx_table_2"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
Indexes:
"mx_index_2" btree (col2)
Foreign-key constraints:
"mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1');
create_distributed_table
--------------------------
(1 row)
-- Check that created tables are marked as streaming replicated tables
SELECT
logicalrelid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
logicalrelid | repmodel
-----------------------------+----------
mx_test_schema_1.mx_table_1 | s
mx_test_schema_2.mx_table_2 | s
(2 rows)
-- See the shards and placements of the mx tables
SELECT
logicalrelid, shardid, nodename, nodeport
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY
logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport
-----------------------------+---------+-----------+----------
mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637
(10 rows)
-- Check that metadata of MX tables exist on the metadata worker
\c - - - :worker_1_port
-- Check that tables are created
\d mx_test_schema_1.mx_table_1
Table "mx_test_schema_1.mx_table_1"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
Indexes:
"mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1)
"mx_index_1" btree (col1)
Referenced by:
TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
\d mx_test_schema_2.mx_table_2
Table "mx_test_schema_2.mx_table_2"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
Indexes:
"mx_index_2" btree (col2)
Foreign-key constraints:
"mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
-- Check that table metadata are created
SELECT
logicalrelid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
logicalrelid | repmodel
-----------------------------+----------
mx_test_schema_1.mx_table_1 | s
mx_test_schema_2.mx_table_2 | s
(2 rows)
-- Check that shard and placement data are created
SELECT
logicalrelid, shardid, nodename, nodeport
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY
logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport
-----------------------------+---------+-----------+----------
mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637
(10 rows)
-- Check that metadata of MX tables don't exist on the non-metadata worker
\c - - - :worker_2_port
\d mx_test_schema_1.mx_table_1
\d mx_test_schema_2.mx_table_2
SELECT * FROM pg_dist_partition;
logicalrelid | partmethod | partkey | colocationid | repmodel
--------------+------------+---------+--------------+----------
(0 rows)
SELECT * FROM pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
(0 rows)
SELECT * FROM pg_dist_shard_placement;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+----------+----------+-------------
(0 rows)
-- Cleanup -- Cleanup
\c - - - :worker_1_port \c - - - :worker_1_port
DROP TABLE mx_test_schema_2.mx_table_2;
DROP TABLE mx_test_schema_1.mx_table_1;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DELETE FROM pg_dist_node; DELETE FROM pg_dist_node;
DELETE FROM pg_dist_partition; DELETE FROM pg_dist_partition;
@ -430,5 +606,9 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row) (1 row)
DROP TABLE mx_testing_schema.mx_test_table CASCADE; DROP TABLE mx_test_schema_2.mx_table_2;
DROP TABLE mx_test_schema_1.mx_table_1;
DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;

View File

@ -139,18 +139,103 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
-- Test DDL propagation in MX tables
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET citus.shard_count = 5;
CREATE SCHEMA mx_test_schema_1;
CREATE SCHEMA mx_test_schema_2;
-- Create MX tables
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text);
CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1);
CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text);
CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 (col2);
ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col1) REFERENCES mx_test_schema_1.mx_table_1(col1);
\d mx_test_schema_1.mx_table_1
\d mx_test_schema_2.mx_table_2
SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1');
SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1');
-- Check that created tables are marked as streaming replicated tables
SELECT
logicalrelid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
-- See the shards and placements of the mx tables
SELECT
logicalrelid, shardid, nodename, nodeport
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY
logicalrelid, shardid;
-- Check that metadata of MX tables exist on the metadata worker
\c - - - :worker_1_port
-- Check that tables are created
\d mx_test_schema_1.mx_table_1
\d mx_test_schema_2.mx_table_2
-- Check that table metadata are created
SELECT
logicalrelid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass;
-- Check that shard and placement data are created
SELECT
logicalrelid, shardid, nodename, nodeport
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY
logicalrelid, shardid;
-- Check that metadata of MX tables don't exist on the non-metadata worker
\c - - - :worker_2_port
\d mx_test_schema_1.mx_table_1
\d mx_test_schema_2.mx_table_2
SELECT * FROM pg_dist_partition;
SELECT * FROM pg_dist_shard;
SELECT * FROM pg_dist_shard_placement;
-- Cleanup -- Cleanup
\c - - - :worker_1_port \c - - - :worker_1_port
DROP TABLE mx_test_schema_2.mx_table_2;
DROP TABLE mx_test_schema_1.mx_table_1;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DELETE FROM pg_dist_node; DELETE FROM pg_dist_node;
DELETE FROM pg_dist_partition; DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard; DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement; DELETE FROM pg_dist_shard_placement;
\d mx_testing_schema.mx_test_table \d mx_testing_schema.mx_test_table
\c - - - :master_port \c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
DROP TABLE mx_testing_schema.mx_test_table CASCADE; DROP TABLE mx_test_schema_2.mx_table_2;
DROP TABLE mx_test_schema_1.mx_table_1;
DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;