From e96e50746f80662aadca66f4bc9b9c2d9fd09c0b Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Wed, 1 Feb 2017 11:17:09 -0800 Subject: [PATCH 1/2] Prevent RF>1 on streaming replicated tables on `master_create_worker_shards` Prior to this change, `master_create_worker_shards` command was not checking the replication model of the target table, thus allowing RF>1 with streaming replicated tables. With this change, `master_create_worker_shards` errors out on the case. --- .../distributed/master/master_create_shards.c | 19 ++++++++++++++++++ .../regress/expected/multi_create_table.out | 20 +++++++++++++++++++ src/test/regress/sql/multi_create_table.sql | 10 ++++++++++ 3 files changed, 49 insertions(+) diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 7c5a21011..ecf3c97e7 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -97,6 +97,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, uint64 hashTokenIncrement = 0; List *existingShardList = NIL; int64 shardIndex = 0; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); /* make sure table is hash partitioned */ CheckHashPartitionedTable(distributedTableId); @@ -138,6 +139,24 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, errmsg("replication_factor must be positive"))); } + /* make sure that RF=1 if the table is streaming replicated */ + if (cacheEntry->replicationModel == REPLICATION_MODEL_STREAMING && + replicationFactor > 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("using replication factor %d with streaming " + "replicated tables are not supported", + replicationFactor), + errdetail("When master_create_distributed_table is called with " + "citus.replication_model streaming, then the table is " + "marked for streaming replication and the shard " + "replication factor of streaming replicated tables " + "must be 1."), + errhint("Use replication factor 1 or set " + "citus.replication_model to streaming and recreate the " + "table"))); + } + /* calculate the split of the hash space */ hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 266b1d794..3120c835d 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -160,6 +160,26 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl (1 row) DROP TABLE mx_table_test; +-- Show that master_create_distributed_table honors citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; + repmodel +---------- + s +(1 row) + +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +SELECT master_create_worker_shards('s_table', 4, 2); +ERROR: using replication factor 2 with streaming replicated tables are not supported +DETAIL: When master_create_distributed_table is called with citus.replication_model streaming, then the table is marked for streaming replication and the shard replication factor of streaming replicated tables must be 1. +HINT: Use replication factor 1 or set citus.replication_model to streaming and recreate the table +DROP TABLE s_table; RESET citus.replication_model; -- Show that it is not possible to create an mx table with the old -- master_create_distributed_table function diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 5bb175a3f..73b277a28 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -126,6 +126,16 @@ SELECT create_distributed_table('mx_table_test', 'col1'); SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; DROP TABLE mx_table_test; +-- Show that master_create_distributed_table honors citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; + +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +SELECT master_create_worker_shards('s_table', 4, 2); + +DROP TABLE s_table; + RESET citus.replication_model; -- Show that it is not possible to create an mx table with the old From 8f66f83ba016a4210f2a5d2ab365d68e1c128dd5 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Wed, 1 Feb 2017 12:15:58 -0800 Subject: [PATCH 2/2] Propagate MX table metadata with old distributed table creation APIs This change makes metadata of the MX table with `master_create_distributed_table` and shard metadata with `master_create_worker_shards` call. --- .../commands/create_distributed_table.c | 6 ++ .../distributed/master/master_create_shards.c | 6 ++ .../distributed/metadata/metadata_sync.c | 31 +++++- src/include/distributed/metadata_sync.h | 1 + .../regress/expected/multi_create_table.out | 16 --- .../regress/expected/multi_metadata_sync.out | 97 +++++++++++++++++++ src/test/regress/sql/multi_create_table.sql | 7 -- src/test/regress/sql/multi_metadata_sync.sql | 43 ++++++++ 8 files changed, 181 insertions(+), 26 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index da09bbbf5..9129057fe 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -111,6 +111,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, INVALID_COLOCATION_ID); + /* create the distributed table metadata on workers */ + if (ShouldSyncTableMetadata(distributedRelationId)) + { + CreateTableMetadataOnWorkers(distributedRelationId); + } + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index ecf3c97e7..f27e49aaf 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -30,6 +30,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" @@ -71,6 +72,11 @@ master_create_worker_shards(PG_FUNCTION_ARGS) CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); + if (ShouldSyncTableMetadata(distributedTableId)) + { + CreateShardMetadataOnWorkers(distributedTableId); + } + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index c239bd3ab..1471abc07 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -988,9 +988,9 @@ HasMetadataWorkers(void) /* * CreateTableMetadataOnWorkers creates the list of commands needed to create the - * given distributed table and sends these commands to all metadata workers i.e. workers - * with hasmetadata=true. Before sending the commands, in order to prevent recursive - * propagation, DDL propagation on workers are disabled with a + * distributed table with metadata for it and its shards and sends these commands to all + * metadata workers i.e. workers with hasmetadata=true. Before sending the commands, in + * order to prevent recursive propagation, DDL propagation on workers are disabled with a * `SET citus.enable_ddl_propagation TO off;` command. */ void @@ -1010,3 +1010,28 @@ CreateTableMetadataOnWorkers(Oid relationId) SendCommandToWorkers(WORKERS_WITH_METADATA, command); } } + + +/* + * CreateShardMetadataOnWorkers creates the shard metadata of the given relation on + * metadata workers. The function first creates commands for inserting relevant tuples + * to pg_dist_shard and pg_dist_shard_placement then sends these commands to metadata + * workers. + */ +void +CreateShardMetadataOnWorkers(Oid relationId) +{ + List *shardIntervalList = NIL; + List *commandList = NIL; + ListCell *commandCell = NULL; + + shardIntervalList = LoadShardIntervalList(relationId); + commandList = ShardListInsertCommand(shardIntervalList); + + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + SendCommandToWorkers(WORKERS_WITH_METADATA, command); + } +} diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index f14a9dcd3..7c781b261 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -35,6 +35,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, char *nodeName, uint32 nodePort); extern void CreateTableMetadataOnWorkers(Oid relationId); +extern void CreateShardMetadataOnWorkers(Oid relationId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 3120c835d..261535ec9 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -181,22 +181,6 @@ DETAIL: When master_create_distributed_table is called with citus.replication_m HINT: Use replication factor 1 or set citus.replication_model to streaming and recreate the table DROP TABLE s_table; RESET citus.replication_model; --- Show that it is not possible to create an mx table with the old --- master_create_distributed_table function -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); - master_create_distributed_table ---------------------------------- - -(1 row) - -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; - repmodel ----------- - c -(1 row) - -DROP TABLE mx_table_test; -- Show that when replication factor > 1 the table is created as coordinator-replicated SET citus.shard_replication_factor TO 2; CREATE TABLE mx_table_test (col1 int, col2 text); diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index b7f2c2a39..5c6eb0545 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1346,6 +1346,102 @@ ORDER BY shardid, nodeport; \c - - - :master_port INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; +-- Show that MX tables can be created with the old distributed table creation APIs +-- First show that we can create a streaming distributed table +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error +CREATE table mx_old_api(a INT, b BIGSERIAL); +SELECT master_create_distributed_table('mx_old_api', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + repmodel +---------- + s +(1 row) + +-- Show that the metadata is created on the worker +\c - - - :worker_1_port +\d mx_old_api + Table "public.mx_old_api" + Column | Type | Modifiers +--------+---------+-------------------------------------------------------- + a | integer | + b | bigint | not null default nextval('mx_old_api_b_seq'::regclass) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + repmodel +---------- + s +(1 row) + +-- Create the worker shards and show that the metadata is also on the workers +\c - - - :master_port +SELECT master_create_worker_shards('mx_old_api', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO mx_old_api (a) VALUES (1); +INSERT INTO mx_old_api (a) VALUES (2); +INSERT INTO mx_old_api (a) VALUES (3); +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + logicalrelid | shardid | nodename | nodeport +--------------+---------+-----------+---------- + mx_old_api | 1310185 | localhost | 57637 + mx_old_api | 1310186 | localhost | 57638 + mx_old_api | 1310187 | localhost | 57637 + mx_old_api | 1310188 | localhost | 57638 +(4 rows) + +-- Show that the metadata is good on the worker and queries work from it +\c - - - :worker_1_port +INSERT INTO mx_old_api (a) VALUES (10); +INSERT INTO mx_old_api (a) VALUES (20); +INSERT INTO mx_old_api (a) VALUES (30); +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + logicalrelid | shardid | nodename | nodeport +--------------+---------+-----------+---------- + mx_old_api | 1310185 | localhost | 57637 + mx_old_api | 1310186 | localhost | 57638 + mx_old_api | 1310187 | localhost | 57637 + mx_old_api | 1310188 | localhost | 57638 +(4 rows) + +SELECT * FROM mx_old_api ORDER BY a; + a | b +----+----------------- + 1 | 1 + 2 | 2 + 3 | 3 + 10 | 281474976710657 + 20 | 281474976710658 + 30 | 281474976710659 +(6 rows) + +-- Get back to the master and check that we can see the rows that INSERTed from the worker +\c - - - :master_port +SELECT * FROM mx_old_api ORDER BY a; + a | b +----+----------------- + 1 | 1 + 2 | 2 + 3 | 3 + 10 | 281474976710657 + 20 | 281474976710658 + 30 | 281474976710659 +(6 rows) + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; @@ -1353,6 +1449,7 @@ NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_ DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_ref; +DROP TABLE mx_old_api; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 73b277a28..055df8359 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -138,13 +138,6 @@ DROP TABLE s_table; RESET citus.replication_model; --- Show that it is not possible to create an mx table with the old --- master_create_distributed_table function -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; -DROP TABLE mx_table_test; - -- Show that when replication factor > 1 the table is created as coordinator-replicated SET citus.shard_replication_factor TO 2; CREATE TABLE mx_table_test (col1 int, col2 text); diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index adb6fa7fe..2281a6d09 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -594,12 +594,55 @@ ORDER BY shardid, nodeport; INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; +-- Show that MX tables can be created with the old distributed table creation APIs +-- First show that we can create a streaming distributed table +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error +CREATE table mx_old_api(a INT, b BIGSERIAL); +SELECT master_create_distributed_table('mx_old_api', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + +-- Show that the metadata is created on the worker +\c - - - :worker_1_port +\d mx_old_api +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + +-- Create the worker shards and show that the metadata is also on the workers +\c - - - :master_port +SELECT master_create_worker_shards('mx_old_api', 4, 1); +INSERT INTO mx_old_api (a) VALUES (1); +INSERT INTO mx_old_api (a) VALUES (2); +INSERT INTO mx_old_api (a) VALUES (3); + +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + +-- Show that the metadata is good on the worker and queries work from it +\c - - - :worker_1_port +INSERT INTO mx_old_api (a) VALUES (10); +INSERT INTO mx_old_api (a) VALUES (20); +INSERT INTO mx_old_api (a) VALUES (30); + +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + +SELECT * FROM mx_old_api ORDER BY a; + +-- Get back to the master and check that we can see the rows that INSERTed from the worker +\c - - - :master_port +SELECT * FROM mx_old_api ORDER BY a; + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_ref; +DROP TABLE mx_old_api; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);