From 697d1468fe3c30ad844ce1beb3b69744f01e032d Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Mon, 10 Jan 2022 22:08:43 +0300 Subject: [PATCH] Use coordianated transaction for object prop --- .../distributed/commands/dependencies.c | 3 +- .../distributed/metadata/node_metadata.c | 29 +++++++++++------- .../distributed/utils/reference_table_utils.c | 8 ++--- src/include/distributed/metadata_sync.h | 1 + src/test/regress/input/multi_copy.source | 2 -- .../regress/sql/multi_colocation_utils.sql | 30 ++++++------------- src/test/regress/sql/multi_metadata_sync.sql | 27 ++++------------- 7 files changed, 36 insertions(+), 64 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 4fd1f813a..1ad5be0ea 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -405,8 +405,7 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), ddlCommands); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 045125e06..7acea3623 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -653,14 +653,13 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) } } - char *currentUser = CurrentUserName(); multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, multipleTableIntegrationCommandList); multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList, ENABLE_DDL_PROPAGATION); SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, workerNode->workerPort, - currentUser, + CitusExtensionOwnerName(), multipleTableIntegrationCommandList); } @@ -889,10 +888,10 @@ ClearDistributedTablesFromNode(WorkerNode *workerNode) list_make1( ENABLE_DDL_PROPAGATION)); - SendCommandListToWorkerOutsideTransaction(workerNode->workerName, - workerNode->workerPort, - CitusExtensionOwnerName(), - clearDistributedTablesCommandList); + SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + CitusExtensionOwnerName(), + clearDistributedTablesCommandList); } @@ -905,7 +904,13 @@ ClearDistributedObjectsFromNode(WorkerNode *workerNode) List *clearDistTableInfoCommandList = NIL; clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, - REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND); + DELETE_ALL_PARTITIONS); + + clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, + DELETE_ALL_SHARDS); + + clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, + DELETE_ALL_PLACEMENTS); clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); @@ -988,10 +993,10 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); /* send commands to new workers*/ - SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CitusExtensionOwnerName(), + ddlCommands); } } @@ -1203,6 +1208,8 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + EnsureSuperUser(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 3f8d2fd5c..daf218cf7 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -350,7 +350,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort); - char *tableOwner = TableOwner(shardInterval->relationId); if (targetPlacement != NULL) { @@ -370,8 +369,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) nodePort))); EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -594,11 +592,9 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) /* create foreign constraints between reference tables */ foreach_ptr(shardInterval, referenceShardIntervalList) { - char *tableOwner = TableOwner(shardInterval->relationId); List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, - commandList); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), commandList); } } } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 82bf859e1..f1df7aeb4 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -74,6 +74,7 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); #define DELETE_ALL_PLACEMENTS "TRUNCATE pg_dist_placement CASCADE" #define DELETE_ALL_SHARDS "TRUNCATE pg_dist_shard CASCADE" #define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object" +#define DELETE_ALL_PARTITIONS "TRUNCATE pg_dist_partition CASCADE" #define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \ "SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition" #define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \ diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index f9abb77dc..0b3067be6 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -498,9 +498,7 @@ SELECT shardid, nodename, nodeport WHERE logicalrelid = 'numbers_append'::regclass order by placementid; -- add the node back -DROP FOREIGN TABLE foreign_table_to_distribute; SELECT 1 FROM master_activate_node('localhost', :worker_1_port); -reset citus.log_remote_commands; RESET client_min_messages; RESET citus.shard_replication_factor; -- add two new shards and verify they are created at both workers diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index a87999761..0c46538aa 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -71,34 +71,27 @@ CREATE TABLE table1_group1 ( id int ); SELECT master_create_distributed_table('table1_group1', 'id', 'hash'); SELECT master_create_worker_shards('table1_group1', 4, 2); -select last_value from pg_dist_colocationid_seq ; - CREATE TABLE table2_group1 ( id int ); SELECT master_create_distributed_table('table2_group1', 'id', 'hash'); SELECT master_create_worker_shards('table2_group1', 4, 2); -table pg_dist_colocation; -table pg_dist_partition; + CREATE TABLE table3_group2 ( id int ); SELECT master_create_distributed_table('table3_group2', 'id', 'hash'); SELECT master_create_worker_shards('table3_group2', 4, 2); -select last_value from pg_dist_colocationid_seq ; CREATE TABLE table4_group2 ( id int ); SELECT master_create_distributed_table('table4_group2', 'id', 'hash'); SELECT master_create_worker_shards('table4_group2', 4, 2); -select last_value from pg_dist_colocationid_seq ; CREATE TABLE table5_groupX ( id int ); SELECT master_create_distributed_table('table5_groupX', 'id', 'hash'); SELECT master_create_worker_shards('table5_groupX', 4, 2); -table pg_dist_colocation; -table pg_dist_partition; + CREATE TABLE table6_append ( id int ); SELECT master_create_distributed_table('table6_append', 'id', 'append'); -SELECT master_create_empty_shard('table6_append');select last_value from pg_dist_colocationid_seq ; - -SELECT master_create_empty_shard('table6_append');select last_value from pg_dist_colocationid_seq ; +SELECT master_create_empty_shard('table6_append'); +SELECT master_create_empty_shard('table6_append'); -- make table1_group1 and table2_group1 co-located manually @@ -108,8 +101,6 @@ SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1'); SELECT get_table_colocation_id('table1_group1'); SELECT get_table_colocation_id('table5_groupX'); SELECT get_table_colocation_id('table6_append'); -table pg_dist_colocation; -table pg_dist_partition; -- check self table co-location SELECT tables_colocated('table1_group1', 'table1_group1'); SELECT tables_colocated('table5_groupX', 'table5_groupX'); @@ -124,8 +115,6 @@ SELECT tables_colocated('table1_group1', 'table3_group2'); -- check table co-location with invalid co-location group SELECT tables_colocated('table1_group1', 'table5_groupX'); SELECT tables_colocated('table1_group1', 'table6_append'); -table pg_dist_colocation; -table pg_dist_partition; -- check self shard co-location SELECT shards_colocated(1300000, 1300000); SELECT shards_colocated(1300016, 1300016); @@ -139,8 +128,7 @@ SELECT shards_colocated(1300000, 1300001); -- check shard co-location with different co-location group SELECT shards_colocated(1300000, 1300005); -table pg_dist_colocation; -table pg_dist_partition; + -- check shard co-location with invalid co-location group SELECT shards_colocated(1300000, 1300016); SELECT shards_colocated(1300000, 1300020); @@ -161,8 +149,6 @@ SELECT find_shard_interval_index(1300001); SELECT find_shard_interval_index(1300002); SELECT find_shard_interval_index(1300003); SELECT find_shard_interval_index(1300016); -table pg_dist_colocation; -table pg_dist_partition; -- check external colocation API SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5); @@ -192,8 +178,6 @@ UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='table2_groupB'::re SET citus.shard_replication_factor to DEFAULT; -- change partition column type -table pg_dist_colocation; -table pg_dist_partition; set citus.log_remote_commands to true; CREATE TABLE table1_groupC ( id text ); SELECT create_distributed_table('table1_groupC', 'id'); @@ -570,3 +554,7 @@ DROP TABLE none; DROP TABLE ref; DROP TABLE local_table; DROP FOREIGN TABLE table3_groupD CASCADE; + +-- Resync remote nodes as well +SELECT citus_activate_node('localhost', :worker_1_port); +SELECT citus_activate_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index dbf87d90c..3d11ff14d 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -6,10 +6,6 @@ -- metadata changes to MX tables. -- Turn metadata sync off at first -\c - - - :worker_1_port -table pg_dist_partition; -table pg_dist_node; - \c - - - :master_port SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); @@ -105,27 +101,14 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888; -- Add a node to another cluster to make sure it's also synced SELECT master_add_secondary_node('localhost', 8889, 'localhost', :worker_1_port, nodecluster => 'second-cluster'); -\c - - - :worker_1_port -table pg_dist_partition; -table pg_dist_node; -table pg_dist_shard; -table pg_dist_shard_placement; - \c - - - :master_port -- Run start_metadata_sync_to_node and citus_activate_node and check that it marked hasmetadata for that worker -table pg_dist_partition; -\d -set citus.log_remote_commands to true; -set citus.worker_min_messages to debug5; SELECT citus_activate_node('localhost', :worker_1_port); -reset citus.log_remote_commands; -reset citus.worker_min_messages; SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port; -- Check that the metadata has been copied to the worker \c - - - :worker_1_port -table pg_dist_partition; SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; @@ -189,12 +172,12 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; -- Make sure that citus_activate_node can be called inside a transaction and rollbacked ---\c - - - :master_port ---BEGIN; ---SELECT citus_activate_node('localhost', :worker_2_port); ---ROLLBACK; +\c - - - :master_port +BEGIN; +SELECT citus_activate_node('localhost', :worker_2_port); +ROLLBACK; ---SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; -- Check that the distributed table can be queried from the worker \c - - - :master_port