Propagate MX table metadata with old distributed table creation APIs

This change makes metadata of the MX table with `master_create_distributed_table`
and shard metadata with `master_create_worker_shards` call.
pull/1195/head
Eren Basak 2017-02-01 12:15:58 -08:00
parent e96e50746f
commit 8f66f83ba0
8 changed files with 181 additions and 26 deletions

View File

@ -111,6 +111,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
distributionMethod, INVALID_COLOCATION_ID);
/* create the distributed table metadata on workers */
if (ShouldSyncTableMetadata(distributedRelationId))
{
CreateTableMetadataOnWorkers(distributedRelationId);
}
PG_RETURN_VOID();
}

View File

@ -30,6 +30,7 @@
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
@ -71,6 +72,11 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);
if (ShouldSyncTableMetadata(distributedTableId))
{
CreateShardMetadataOnWorkers(distributedTableId);
}
PG_RETURN_VOID();
}

View File

@ -988,9 +988,9 @@ HasMetadataWorkers(void)
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers
* with hasmetadata=true. Before sending the commands, in order to prevent recursive
* propagation, DDL propagation on workers are disabled with a
* distributed table with metadata for it and its shards and sends these commands to all
* metadata workers i.e. workers with hasmetadata=true. Before sending the commands, in
* order to prevent recursive propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command.
*/
void
@ -1010,3 +1010,28 @@ CreateTableMetadataOnWorkers(Oid relationId)
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}
/*
* CreateShardMetadataOnWorkers creates the shard metadata of the given relation on
* metadata workers. The function first creates commands for inserting relevant tuples
* to pg_dist_shard and pg_dist_shard_placement then sends these commands to metadata
* workers.
*/
void
CreateShardMetadataOnWorkers(Oid relationId)
{
List *shardIntervalList = NIL;
List *commandList = NIL;
ListCell *commandCell = NULL;
shardIntervalList = LoadShardIntervalList(relationId);
commandList = ShardListInsertCommand(shardIntervalList);
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}

View File

@ -35,6 +35,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort);
extern void CreateTableMetadataOnWorkers(Oid relationId);
extern void CreateShardMetadataOnWorkers(Oid relationId);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"

View File

@ -181,22 +181,6 @@ DETAIL: When master_create_distributed_table is called with citus.replication_m
HINT: Use replication factor 1 or set citus.replication_model to streaming and recreate the table
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that it is not possible to create an mx table with the old
-- master_create_distributed_table function
CREATE TABLE mx_table_test (col1 int, col2 text);
SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
repmodel
----------
c
(1 row)
DROP TABLE mx_table_test;
-- Show that when replication factor > 1 the table is created as coordinator-replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE mx_table_test (col1 int, col2 text);

View File

@ -1346,6 +1346,102 @@ ORDER BY shardid, nodeport;
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Show that MX tables can be created with the old distributed table creation APIs
-- First show that we can create a streaming distributed table
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error
CREATE table mx_old_api(a INT, b BIGSERIAL);
SELECT master_create_distributed_table('mx_old_api', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
repmodel
----------
s
(1 row)
-- Show that the metadata is created on the worker
\c - - - :worker_1_port
\d mx_old_api
Table "public.mx_old_api"
Column | Type | Modifiers
--------+---------+--------------------------------------------------------
a | integer |
b | bigint | not null default nextval('mx_old_api_b_seq'::regclass)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
repmodel
----------
s
(1 row)
-- Create the worker shards and show that the metadata is also on the workers
\c - - - :master_port
SELECT master_create_worker_shards('mx_old_api', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO mx_old_api (a) VALUES (1);
INSERT INTO mx_old_api (a) VALUES (2);
INSERT INTO mx_old_api (a) VALUES (3);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
logicalrelid | shardid | nodename | nodeport
--------------+---------+-----------+----------
mx_old_api | 1310185 | localhost | 57637
mx_old_api | 1310186 | localhost | 57638
mx_old_api | 1310187 | localhost | 57637
mx_old_api | 1310188 | localhost | 57638
(4 rows)
-- Show that the metadata is good on the worker and queries work from it
\c - - - :worker_1_port
INSERT INTO mx_old_api (a) VALUES (10);
INSERT INTO mx_old_api (a) VALUES (20);
INSERT INTO mx_old_api (a) VALUES (30);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
logicalrelid | shardid | nodename | nodeport
--------------+---------+-----------+----------
mx_old_api | 1310185 | localhost | 57637
mx_old_api | 1310186 | localhost | 57638
mx_old_api | 1310187 | localhost | 57637
mx_old_api | 1310188 | localhost | 57638
(4 rows)
SELECT * FROM mx_old_api ORDER BY a;
a | b
----+-----------------
1 | 1
2 | 2
3 | 3
10 | 281474976710657
20 | 281474976710658
30 | 281474976710659
(6 rows)
-- Get back to the master and check that we can see the rows that INSERTed from the worker
\c - - - :master_port
SELECT * FROM mx_old_api ORDER BY a;
a | b
----+-----------------
1 | 1
2 | 2
3 | 3
10 | 281474976710657
20 | 281474976710658
30 | 281474976710659
(6 rows)
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
@ -1353,6 +1449,7 @@ NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
DROP TABLE mx_old_api;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------

View File

@ -138,13 +138,6 @@ DROP TABLE s_table;
RESET citus.replication_model;
-- Show that it is not possible to create an mx table with the old
-- master_create_distributed_table function
CREATE TABLE mx_table_test (col1 int, col2 text);
SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
DROP TABLE mx_table_test;
-- Show that when replication factor > 1 the table is created as coordinator-replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE mx_table_test (col1 int, col2 text);

View File

@ -594,12 +594,55 @@ ORDER BY shardid, nodeport;
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Show that MX tables can be created with the old distributed table creation APIs
-- First show that we can create a streaming distributed table
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error
CREATE table mx_old_api(a INT, b BIGSERIAL);
SELECT master_create_distributed_table('mx_old_api', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
-- Show that the metadata is created on the worker
\c - - - :worker_1_port
\d mx_old_api
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
-- Create the worker shards and show that the metadata is also on the workers
\c - - - :master_port
SELECT master_create_worker_shards('mx_old_api', 4, 1);
INSERT INTO mx_old_api (a) VALUES (1);
INSERT INTO mx_old_api (a) VALUES (2);
INSERT INTO mx_old_api (a) VALUES (3);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
-- Show that the metadata is good on the worker and queries work from it
\c - - - :worker_1_port
INSERT INTO mx_old_api (a) VALUES (10);
INSERT INTO mx_old_api (a) VALUES (20);
INSERT INTO mx_old_api (a) VALUES (30);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
SELECT * FROM mx_old_api ORDER BY a;
-- Get back to the master and check that we can see the rows that INSERTed from the worker
\c - - - :master_port
SELECT * FROM mx_old_api ORDER BY a;
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
DROP TABLE mx_old_api;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);