diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b0d34d0af..1515003b2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -63,11 +63,11 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); /* * start_metadata_sync_to_node function creates the metadata in a worker for preparing the - * worker for accepting MX-table queries. The function first sets the localGroupId of the - * worker so that the worker knows which tuple in pg_dist_node table represents itself. - * After that, SQL statetemens for re-creating metadata about mx distributed - * tables are sent to the worker. Finally, the hasmetadata column of the target node in - * pg_dist_node is marked as true. + * worker for accepting queries. The function first sets the localGroupId of the worker + * so that the worker knows which tuple in pg_dist_node table represents itself. After + * that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are + * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node + * is marked as true. */ Datum start_metadata_sync_to_node(PG_FUNCTION_ARGS) @@ -132,7 +132,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) /* * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * to false in pg_dist_node table, thus indicating that the specified worker node does not - * receive DDL changes anymore and cannot be used for issuing mx queries. + * receive DDL changes anymore and cannot be used for issuing queries. */ Datum stop_metadata_sync_to_node(PG_FUNCTION_ARGS) @@ -159,19 +159,24 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) /* - * ShouldSyncTableMetadata checks if a distributed table has streaming replication model - * and hash distribution. In that case the distributed table is considered an MX table, - * and its metadata is required to exist on the worker nodes. + * ShouldSyncTableMetadata checks if the metadata of a distributed table should be + * propagated to metadata workers, i.e. the table is an MX table or reference table. + * Tables with streaming replication model (which means RF=1) and hash distribution are + * considered as MX tables while tables with none distribution are reference tables. */ bool ShouldSyncTableMetadata(Oid relationId) { DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId); - bool usesHashDistribution = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH); - bool usesStreamingReplication = + + bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH); + bool streamingReplicated = (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING); - if (usesStreamingReplication && usesHashDistribution) + bool mxTable = (streamingReplicated && hashDistributed); + bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); + + if (mxTable || referenceTable) { return true; } @@ -199,7 +204,7 @@ MetadataCreateCommands(void) { List *metadataSnapshotCommandList = NIL; List *distributedTableList = DistributedTableList(); - List *mxTableList = NIL; + List *propagatedTableList = NIL; List *workerNodeList = WorkerNodeList(); ListCell *distributedTableCell = NULL; char *nodeListInsertCommand = NULL; @@ -209,19 +214,19 @@ MetadataCreateCommands(void) metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, nodeListInsertCommand); - /* create the list of mx tables */ + /* create the list of tables whose metadata will be created */ foreach(distributedTableCell, distributedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); if (ShouldSyncTableMetadata(cacheEntry->relationId)) { - mxTableList = lappend(mxTableList, cacheEntry); + propagatedTableList = lappend(propagatedTableList, cacheEntry); } } - /* create the mx tables, but not the metadata */ - foreach(distributedTableCell, mxTableList) + /* create the tables, but not the metadata */ + foreach(distributedTableCell, propagatedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); @@ -240,7 +245,7 @@ MetadataCreateCommands(void) } /* construct the foreign key constraints after all tables are created */ - foreach(distributedTableCell, mxTableList) + foreach(distributedTableCell, propagatedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); @@ -253,7 +258,7 @@ MetadataCreateCommands(void) } /* after all tables are created, create the metadata */ - foreach(distributedTableCell, mxTableList) + foreach(distributedTableCell, propagatedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); @@ -323,7 +328,7 @@ GetDistributedTableDDLEvents(Oid relationId) metadataCommand = DistributionCreateCommand(cacheEntry); commandList = lappend(commandList, metadataCommand); - /* commands to create the truncate trigger of the mx table */ + /* commands to create the truncate trigger of the table */ truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); commandList = lappend(commandList, truncateTriggerCreateCommand); @@ -597,7 +602,6 @@ ShardListInsertCommand(List *shardIntervalList) appendStringInfo(minHashToken, "NULL"); } - if (shardInterval->maxValueExists) { appendStringInfo(maxHashToken, "'%d'", DatumGetInt32( diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 10b0833d1..be0c3fe8d 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1221,19 +1221,56 @@ WHERE SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset --- Cleanup -SELECT worker_drop_distributed_table('mx_ref'::regclass); - worker_drop_distributed_table -------------------------------- - -(1 row) +-- Check that DDL commands are propagated to reference tables on workers +\c - - - :master_port +ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +CREATE INDEX mx_ref_index ON mx_ref(col_1); +\d mx_ref + Table "public.mx_ref" + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + col_3 | numeric | default 0 +Indexes: + "mx_ref_index" btree (col_1) +\c - - - :worker_1_port +\d mx_ref + Table "public.mx_ref" + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + col_3 | numeric | default 0 +Indexes: + "mx_ref_index" btree (col_1) + + +-- Check that metada is cleaned successfully upon drop table +\c - - - :master_port +DROP TABLE mx_ref; +\d mx_ref +\c - - - :worker_1_port +\d mx_ref +SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +--------------+---------+--------------+---------------+--------------- +(0 rows) + +SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+----------+----------+------------- +(0 rows) + +-- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; -DROP TABLE mx_ref; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index aecaf0191..c8308dc38 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -1018,6 +1018,9 @@ ORDER BY s.logicalrelid, sp.shardstate; reference_failure_test | 1 | 2 (1 row) +-- connect back to the worker and set rename the test_user back +\c - :default_user - :worker_1_port +ALTER USER test_user_new RENAME TO test_user; -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 1d4be75d6..cdb343e7e 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -28,6 +28,13 @@ SELECT create_distributed_table('mx_table_2', 'col_1'); (1 row) +CREATE TABLE mx_ref_table (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref_table'); + create_reference_table +------------------------ + +(1 row) + -- Check that the created tables are colocated MX tables SELECT logicalrelid, repmodel, colocationid FROM pg_dist_partition @@ -46,6 +53,9 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +INSERT INTO mx_ref_table VALUES (-37, 'morbi'); +INSERT INTO mx_ref_table VALUES (-78, 'sapien'); +INSERT INTO mx_ref_table VALUES (-34, 'augue'); SELECT * FROM mx_table ORDER BY col_1; col_1 | col_2 | col_3 -------+----------+------- @@ -98,6 +108,35 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; 5 (1 row) +-- INSERT/UPDATE/DELETE on reference tables +SELECT * FROM mx_ref_table ORDER BY col_1; + col_1 | col_2 +-------+-------- + -78 | sapien + -37 | morbi + -34 | augue +(3 rows) + +INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum'); +ERROR: cannot perform distributed planning for the given modification +DETAIL: Modifications to reference tables are supported only from the schema node. +UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Modifications to reference tables are supported only from the schema node. +DELETE FROM mx_ref_table WHERE col_1 = -78; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Modifications to reference tables are supported only from the schema node. +SELECT * FROM mx_ref_table ORDER BY col_1; + col_1 | col_2 +-------+-------- + -78 | sapien + -37 | morbi + -34 | augue +(3 rows) + +\c - - - :master_port +DROP TABLE mx_ref_table; +\c - - - :worker_1_port -- DDL commands \d mx_table Table "public.mx_table" diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index e052af6dd..795e48247 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -525,14 +525,30 @@ WHERE SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset --- Cleanup -SELECT worker_drop_distributed_table('mx_ref'::regclass); +-- Check that DDL commands are propagated to reference tables on workers +\c - - - :master_port +ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0; +CREATE INDEX mx_ref_index ON mx_ref(col_1); +\d mx_ref +\c - - - :worker_1_port +\d mx_ref + +-- Check that metada is cleaned successfully upon drop table +\c - - - :master_port +DROP TABLE mx_ref; +\d mx_ref + +\c - - - :worker_1_port +\d mx_ref +SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid; +SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid; + +-- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; -DROP TABLE mx_ref; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 8a2497e57..634740cc6 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -758,6 +758,10 @@ AND s.logicalrelid = 'reference_failure_test'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; +-- connect back to the worker and set rename the test_user back +\c - :default_user - :worker_1_port +ALTER USER test_user_new RENAME TO test_user; + -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index e629c2926..c171ce359 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -25,6 +25,9 @@ SELECT create_distributed_table('mx_table', 'col_1'); CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL); SELECT create_distributed_table('mx_table_2', 'col_1'); +CREATE TABLE mx_ref_table (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref_table'); + -- Check that the created tables are colocated MX tables SELECT logicalrelid, repmodel, colocationid FROM pg_dist_partition @@ -41,6 +44,10 @@ COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); 65832, 'amet' \. +INSERT INTO mx_ref_table VALUES (-37, 'morbi'); +INSERT INTO mx_ref_table VALUES (-78, 'sapien'); +INSERT INTO mx_ref_table VALUES (-34, 'augue'); + SELECT * FROM mx_table ORDER BY col_1; -- Try commands from metadata worker @@ -73,6 +80,17 @@ INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; +-- INSERT/UPDATE/DELETE on reference tables +SELECT * FROM mx_ref_table ORDER BY col_1; +INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum'); +UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37; +DELETE FROM mx_ref_table WHERE col_1 = -78; +SELECT * FROM mx_ref_table ORDER BY col_1; + +\c - - - :master_port +DROP TABLE mx_ref_table; +\c - - - :worker_1_port + -- DDL commands \d mx_table CREATE INDEX mx_test_index ON mx_table(col_1);