pull/1195/merge
Eren Başak 2017-02-11 21:13:25 +00:00 committed by GitHub
commit eb8750fa6f
8 changed files with 222 additions and 18 deletions

View File

@ -111,6 +111,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
distributionMethod, INVALID_COLOCATION_ID);
/* create the distributed table metadata on workers */
if (ShouldSyncTableMetadata(distributedRelationId))
{
CreateTableMetadataOnWorkers(distributedRelationId);
}
PG_RETURN_VOID();
}

View File

@ -30,6 +30,7 @@
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
@ -71,6 +72,11 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);
if (ShouldSyncTableMetadata(distributedTableId))
{
CreateShardMetadataOnWorkers(distributedTableId);
}
PG_RETURN_VOID();
}
@ -97,6 +103,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
uint64 hashTokenIncrement = 0;
List *existingShardList = NIL;
int64 shardIndex = 0;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
@ -138,6 +145,24 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
errmsg("replication_factor must be positive")));
}
/* make sure that RF=1 if the table is streaming replicated */
if (cacheEntry->replicationModel == REPLICATION_MODEL_STREAMING &&
replicationFactor > 1)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("using replication factor %d with streaming "
"replicated tables are not supported",
replicationFactor),
errdetail("When master_create_distributed_table is called with "
"citus.replication_model streaming, then the table is "
"marked for streaming replication and the shard "
"replication factor of streaming replicated tables "
"must be 1."),
errhint("Use replication factor 1 or set "
"citus.replication_model to streaming and recreate the "
"table")));
}
/* calculate the split of the hash space */
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;

View File

@ -988,9 +988,9 @@ HasMetadataWorkers(void)
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers
* with hasmetadata=true. Before sending the commands, in order to prevent recursive
* propagation, DDL propagation on workers are disabled with a
* distributed table with metadata for it and its shards and sends these commands to all
* metadata workers i.e. workers with hasmetadata=true. Before sending the commands, in
* order to prevent recursive propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command.
*/
void
@ -1010,3 +1010,28 @@ CreateTableMetadataOnWorkers(Oid relationId)
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}
/*
* CreateShardMetadataOnWorkers creates the shard metadata of the given relation on
* metadata workers. The function first creates commands for inserting relevant tuples
* to pg_dist_shard and pg_dist_shard_placement then sends these commands to metadata
* workers.
*/
void
CreateShardMetadataOnWorkers(Oid relationId)
{
List *shardIntervalList = NIL;
List *commandList = NIL;
ListCell *commandCell = NULL;
shardIntervalList = LoadShardIntervalList(relationId);
commandList = ShardListInsertCommand(shardIntervalList);
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}

View File

@ -35,6 +35,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort);
extern void CreateTableMetadataOnWorkers(Oid relationId);
extern void CreateShardMetadataOnWorkers(Oid relationId);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"

View File

@ -160,23 +160,27 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
(1 row)
DROP TABLE mx_table_test;
RESET citus.replication_model;
-- Show that it is not possible to create an mx table with the old
-- master_create_distributed_table function
CREATE TABLE mx_table_test (col1 int, col2 text);
SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash');
-- Show that master_create_distributed_table honors citus.replication_model GUC
CREATE TABLE s_table(a int);
SELECT master_create_distributed_table('s_table', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
repmodel
----------
c
s
(1 row)
DROP TABLE mx_table_test;
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
SELECT master_create_worker_shards('s_table', 4, 2);
ERROR: using replication factor 2 with streaming replicated tables are not supported
DETAIL: When master_create_distributed_table is called with citus.replication_model streaming, then the table is marked for streaming replication and the shard replication factor of streaming replicated tables must be 1.
HINT: Use replication factor 1 or set citus.replication_model to streaming and recreate the table
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that when replication factor > 1 the table is created as coordinator-replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE mx_table_test (col1 int, col2 text);

View File

@ -1346,6 +1346,102 @@ ORDER BY shardid, nodeport;
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Show that MX tables can be created with the old distributed table creation APIs
-- First show that we can create a streaming distributed table
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error
CREATE table mx_old_api(a INT, b BIGSERIAL);
SELECT master_create_distributed_table('mx_old_api', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
repmodel
----------
s
(1 row)
-- Show that the metadata is created on the worker
\c - - - :worker_1_port
\d mx_old_api
Table "public.mx_old_api"
Column | Type | Modifiers
--------+---------+--------------------------------------------------------
a | integer |
b | bigint | not null default nextval('mx_old_api_b_seq'::regclass)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
repmodel
----------
s
(1 row)
-- Create the worker shards and show that the metadata is also on the workers
\c - - - :master_port
SELECT master_create_worker_shards('mx_old_api', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO mx_old_api (a) VALUES (1);
INSERT INTO mx_old_api (a) VALUES (2);
INSERT INTO mx_old_api (a) VALUES (3);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
logicalrelid | shardid | nodename | nodeport
--------------+---------+-----------+----------
mx_old_api | 1310185 | localhost | 57637
mx_old_api | 1310186 | localhost | 57638
mx_old_api | 1310187 | localhost | 57637
mx_old_api | 1310188 | localhost | 57638
(4 rows)
-- Show that the metadata is good on the worker and queries work from it
\c - - - :worker_1_port
INSERT INTO mx_old_api (a) VALUES (10);
INSERT INTO mx_old_api (a) VALUES (20);
INSERT INTO mx_old_api (a) VALUES (30);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
logicalrelid | shardid | nodename | nodeport
--------------+---------+-----------+----------
mx_old_api | 1310185 | localhost | 57637
mx_old_api | 1310186 | localhost | 57638
mx_old_api | 1310187 | localhost | 57637
mx_old_api | 1310188 | localhost | 57638
(4 rows)
SELECT * FROM mx_old_api ORDER BY a;
a | b
----+-----------------
1 | 1
2 | 2
3 | 3
10 | 281474976710657
20 | 281474976710658
30 | 281474976710659
(6 rows)
-- Get back to the master and check that we can see the rows that INSERTed from the worker
\c - - - :master_port
SELECT * FROM mx_old_api ORDER BY a;
a | b
----+-----------------
1 | 1
2 | 2
3 | 3
10 | 281474976710657
20 | 281474976710658
30 | 281474976710659
(6 rows)
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
@ -1353,6 +1449,7 @@ NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
DROP TABLE mx_old_api;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------

View File

@ -126,14 +126,17 @@ SELECT create_distributed_table('mx_table_test', 'col1');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
DROP TABLE mx_table_test;
RESET citus.replication_model;
-- Show that master_create_distributed_table honors citus.replication_model GUC
CREATE TABLE s_table(a int);
SELECT master_create_distributed_table('s_table', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
-- Show that it is not possible to create an mx table with the old
-- master_create_distributed_table function
CREATE TABLE mx_table_test (col1 int, col2 text);
SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
DROP TABLE mx_table_test;
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
SELECT master_create_worker_shards('s_table', 4, 2);
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that when replication factor > 1 the table is created as coordinator-replicated
SET citus.shard_replication_factor TO 2;

View File

@ -594,12 +594,55 @@ ORDER BY shardid, nodeport;
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Show that MX tables can be created with the old distributed table creation APIs
-- First show that we can create a streaming distributed table
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error
CREATE table mx_old_api(a INT, b BIGSERIAL);
SELECT master_create_distributed_table('mx_old_api', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
-- Show that the metadata is created on the worker
\c - - - :worker_1_port
\d mx_old_api
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass;
-- Create the worker shards and show that the metadata is also on the workers
\c - - - :master_port
SELECT master_create_worker_shards('mx_old_api', 4, 1);
INSERT INTO mx_old_api (a) VALUES (1);
INSERT INTO mx_old_api (a) VALUES (2);
INSERT INTO mx_old_api (a) VALUES (3);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
-- Show that the metadata is good on the worker and queries work from it
\c - - - :worker_1_port
INSERT INTO mx_old_api (a) VALUES (10);
INSERT INTO mx_old_api (a) VALUES (20);
INSERT INTO mx_old_api (a) VALUES (30);
SELECT logicalrelid, shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_old_api'::regclass
ORDER BY shardid, nodename, nodeport;
SELECT * FROM mx_old_api ORDER BY a;
-- Get back to the master and check that we can see the rows that INSERTed from the worker
\c - - - :master_port
SELECT * FROM mx_old_api ORDER BY a;
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
DROP TABLE mx_old_api;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);