diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index da09bbbf5..9129057fe 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -111,6 +111,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, INVALID_COLOCATION_ID); + /* create the distributed table metadata on workers */ + if (ShouldSyncTableMetadata(distributedRelationId)) + { + CreateTableMetadataOnWorkers(distributedRelationId); + } + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 7c5a21011..f27e49aaf 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -30,6 +30,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" @@ -71,6 +72,11 @@ master_create_worker_shards(PG_FUNCTION_ARGS) CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); + if (ShouldSyncTableMetadata(distributedTableId)) + { + CreateShardMetadataOnWorkers(distributedTableId); + } + PG_RETURN_VOID(); } @@ -97,6 +103,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, uint64 hashTokenIncrement = 0; List *existingShardList = NIL; int64 shardIndex = 0; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); /* make sure table is hash partitioned */ CheckHashPartitionedTable(distributedTableId); @@ -138,6 +145,24 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, errmsg("replication_factor must be positive"))); } + /* make sure that RF=1 if the table is streaming replicated */ + if (cacheEntry->replicationModel == REPLICATION_MODEL_STREAMING && + replicationFactor > 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("using replication factor %d with streaming " + "replicated tables are not supported", + replicationFactor), + errdetail("When master_create_distributed_table is called with " + "citus.replication_model streaming, then the table is " + "marked for streaming replication and the shard " + "replication factor of streaming replicated tables " + "must be 1."), + errhint("Use replication factor 1 or set " + "citus.replication_model to streaming and recreate the " + "table"))); + } + /* calculate the split of the hash space */ hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index c239bd3ab..1471abc07 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -988,9 +988,9 @@ HasMetadataWorkers(void) /* * CreateTableMetadataOnWorkers creates the list of commands needed to create the - * given distributed table and sends these commands to all metadata workers i.e. workers - * with hasmetadata=true. Before sending the commands, in order to prevent recursive - * propagation, DDL propagation on workers are disabled with a + * distributed table with metadata for it and its shards and sends these commands to all + * metadata workers i.e. workers with hasmetadata=true. Before sending the commands, in + * order to prevent recursive propagation, DDL propagation on workers are disabled with a * `SET citus.enable_ddl_propagation TO off;` command. */ void @@ -1010,3 +1010,28 @@ CreateTableMetadataOnWorkers(Oid relationId) SendCommandToWorkers(WORKERS_WITH_METADATA, command); } } + + +/* + * CreateShardMetadataOnWorkers creates the shard metadata of the given relation on + * metadata workers. The function first creates commands for inserting relevant tuples + * to pg_dist_shard and pg_dist_shard_placement then sends these commands to metadata + * workers. + */ +void +CreateShardMetadataOnWorkers(Oid relationId) +{ + List *shardIntervalList = NIL; + List *commandList = NIL; + ListCell *commandCell = NULL; + + shardIntervalList = LoadShardIntervalList(relationId); + commandList = ShardListInsertCommand(shardIntervalList); + + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + SendCommandToWorkers(WORKERS_WITH_METADATA, command); + } +} diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index f14a9dcd3..7c781b261 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -35,6 +35,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, char *nodeName, uint32 nodePort); extern void CreateTableMetadataOnWorkers(Oid relationId); +extern void CreateShardMetadataOnWorkers(Oid relationId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 266b1d794..261535ec9 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -160,23 +160,27 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl (1 row) DROP TABLE mx_table_test; -RESET citus.replication_model; --- Show that it is not possible to create an mx table with the old --- master_create_distributed_table function -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); +-- Show that master_create_distributed_table honors citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); master_create_distributed_table --------------------------------- (1 row) -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; repmodel ---------- - c + s (1 row) -DROP TABLE mx_table_test; +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +SELECT master_create_worker_shards('s_table', 4, 2); +ERROR: using replication factor 2 with streaming replicated tables are not supported +DETAIL: When master_create_distributed_table is called with citus.replication_model streaming, then the table is marked for streaming replication and the shard replication factor of streaming replicated tables must be 1. +HINT: Use replication factor 1 or set citus.replication_model to streaming and recreate the table +DROP TABLE s_table; +RESET citus.replication_model; -- Show that when replication factor > 1 the table is created as coordinator-replicated SET citus.shard_replication_factor TO 2; CREATE TABLE mx_table_test (col1 int, col2 text); diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index a297de3fa..9d0357aa4 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1346,6 +1346,102 @@ ORDER BY shardid, nodeport; \c - - - :master_port INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; +-- Show that MX tables can be created with the old distributed table creation APIs +-- First show that we can create a streaming distributed table +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error +CREATE table mx_old_api(a INT, b BIGSERIAL); +SELECT master_create_distributed_table('mx_old_api', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + repmodel +---------- + s +(1 row) + +-- Show that the metadata is created on the worker +\c - - - :worker_1_port +\d mx_old_api + Table "public.mx_old_api" + Column | Type | Modifiers +--------+---------+-------------------------------------------------------- + a | integer | + b | bigint | not null default nextval('mx_old_api_b_seq'::regclass) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + repmodel +---------- + s +(1 row) + +-- Create the worker shards and show that the metadata is also on the workers +\c - - - :master_port +SELECT master_create_worker_shards('mx_old_api', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO mx_old_api (a) VALUES (1); +INSERT INTO mx_old_api (a) VALUES (2); +INSERT INTO mx_old_api (a) VALUES (3); +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + logicalrelid | shardid | nodename | nodeport +--------------+---------+-----------+---------- + mx_old_api | 1310185 | localhost | 57637 + mx_old_api | 1310186 | localhost | 57638 + mx_old_api | 1310187 | localhost | 57637 + mx_old_api | 1310188 | localhost | 57638 +(4 rows) + +-- Show that the metadata is good on the worker and queries work from it +\c - - - :worker_1_port +INSERT INTO mx_old_api (a) VALUES (10); +INSERT INTO mx_old_api (a) VALUES (20); +INSERT INTO mx_old_api (a) VALUES (30); +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + logicalrelid | shardid | nodename | nodeport +--------------+---------+-----------+---------- + mx_old_api | 1310185 | localhost | 57637 + mx_old_api | 1310186 | localhost | 57638 + mx_old_api | 1310187 | localhost | 57637 + mx_old_api | 1310188 | localhost | 57638 +(4 rows) + +SELECT * FROM mx_old_api ORDER BY a; + a | b +----+----------------- + 1 | 1 + 2 | 2 + 3 | 3 + 10 | 281474976710657 + 20 | 281474976710658 + 30 | 281474976710659 +(6 rows) + +-- Get back to the master and check that we can see the rows that INSERTed from the worker +\c - - - :master_port +SELECT * FROM mx_old_api ORDER BY a; + a | b +----+----------------- + 1 | 1 + 2 | 2 + 3 | 3 + 10 | 281474976710657 + 20 | 281474976710658 + 30 | 281474976710659 +(6 rows) + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; @@ -1353,6 +1449,7 @@ NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_ DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_ref; +DROP TABLE mx_old_api; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 5bb175a3f..055df8359 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -126,14 +126,17 @@ SELECT create_distributed_table('mx_table_test', 'col1'); SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; DROP TABLE mx_table_test; -RESET citus.replication_model; +-- Show that master_create_distributed_table honors citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; --- Show that it is not possible to create an mx table with the old --- master_create_distributed_table function -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; -DROP TABLE mx_table_test; +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +SELECT master_create_worker_shards('s_table', 4, 2); + +DROP TABLE s_table; + +RESET citus.replication_model; -- Show that when replication factor > 1 the table is created as coordinator-replicated SET citus.shard_replication_factor TO 2; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index adb6fa7fe..2281a6d09 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -594,12 +594,55 @@ ORDER BY shardid, nodeport; INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; +-- Show that MX tables can be created with the old distributed table creation APIs +-- First show that we can create a streaming distributed table +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; -- Prevent the ingenuine error +CREATE table mx_old_api(a INT, b BIGSERIAL); +SELECT master_create_distributed_table('mx_old_api', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + +-- Show that the metadata is created on the worker +\c - - - :worker_1_port +\d mx_old_api +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_old_api'::regclass; + +-- Create the worker shards and show that the metadata is also on the workers +\c - - - :master_port +SELECT master_create_worker_shards('mx_old_api', 4, 1); +INSERT INTO mx_old_api (a) VALUES (1); +INSERT INTO mx_old_api (a) VALUES (2); +INSERT INTO mx_old_api (a) VALUES (3); + +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + +-- Show that the metadata is good on the worker and queries work from it +\c - - - :worker_1_port +INSERT INTO mx_old_api (a) VALUES (10); +INSERT INTO mx_old_api (a) VALUES (20); +INSERT INTO mx_old_api (a) VALUES (30); + +SELECT logicalrelid, shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_old_api'::regclass +ORDER BY shardid, nodename, nodeport; + +SELECT * FROM mx_old_api ORDER BY a; + +-- Get back to the master and check that we can see the rows that INSERTed from the worker +\c - - - :master_port +SELECT * FROM mx_old_api ORDER BY a; + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_ref; +DROP TABLE mx_old_api; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);