Use coordianated transaction for object prop

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-10 22:08:43 +03:00
parent 4a7a8b8835
commit 697d1468fe
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
7 changed files with 36 additions and 64 deletions

View File

@ -405,8 +405,7 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands);
CitusExtensionOwnerName(), ddlCommands);
} }

View File

@ -653,14 +653,13 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode)
} }
} }
char *currentUser = CurrentUserName();
multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION,
multipleTableIntegrationCommandList); multipleTableIntegrationCommandList);
multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList, multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
currentUser, CitusExtensionOwnerName(),
multipleTableIntegrationCommandList); multipleTableIntegrationCommandList);
} }
@ -889,7 +888,7 @@ ClearDistributedTablesFromNode(WorkerNode *workerNode)
list_make1( list_make1(
ENABLE_DDL_PROPAGATION)); ENABLE_DDL_PROPAGATION));
SendCommandListToWorkerOutsideTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
clearDistributedTablesCommandList); clearDistributedTablesCommandList);
@ -905,7 +904,13 @@ ClearDistributedObjectsFromNode(WorkerNode *workerNode)
List *clearDistTableInfoCommandList = NIL; List *clearDistTableInfoCommandList = NIL;
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND); DELETE_ALL_PARTITIONS);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_SHARDS);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_PLACEMENTS);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS); DELETE_ALL_DISTRIBUTED_OBJECTS);
@ -988,7 +993,7 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode)
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers*/ /* send commands to new workers*/
SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort, newWorkerNode->workerPort,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
ddlCommands); ddlCommands);
@ -1203,6 +1208,8 @@ ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true; bool isActive = true;
EnsureSuperUser();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);

View File

@ -350,7 +350,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort); nodeName, nodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
if (targetPlacement != NULL) if (targetPlacement != NULL)
{ {
@ -370,8 +369,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
nodePort))); nodePort)));
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommandList);
ddlCommandList);
int32 groupId = GroupForNode(nodeName, nodePort); int32 groupId = GroupForNode(nodeName, nodePort);
uint64 placementId = GetNextPlacementId(); uint64 placementId = GetNextPlacementId();
@ -594,11 +592,9 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
/* create foreign constraints between reference tables */ /* create foreign constraints between reference tables */
foreach_ptr(shardInterval, referenceShardIntervalList) foreach_ptr(shardInterval, referenceShardIntervalList)
{ {
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval); List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), commandList);
commandList);
} }
} }
} }

View File

@ -74,6 +74,7 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
#define DELETE_ALL_PLACEMENTS "TRUNCATE pg_dist_placement CASCADE" #define DELETE_ALL_PLACEMENTS "TRUNCATE pg_dist_placement CASCADE"
#define DELETE_ALL_SHARDS "TRUNCATE pg_dist_shard CASCADE" #define DELETE_ALL_SHARDS "TRUNCATE pg_dist_shard CASCADE"
#define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object" #define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object"
#define DELETE_ALL_PARTITIONS "TRUNCATE pg_dist_partition CASCADE"
#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \
"SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition" "SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition"
#define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \

View File

@ -498,9 +498,7 @@ SELECT shardid, nodename, nodeport
WHERE logicalrelid = 'numbers_append'::regclass order by placementid; WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
-- add the node back -- add the node back
DROP FOREIGN TABLE foreign_table_to_distribute;
SELECT 1 FROM master_activate_node('localhost', :worker_1_port); SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
reset citus.log_remote_commands;
RESET client_min_messages; RESET client_min_messages;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
-- add two new shards and verify they are created at both workers -- add two new shards and verify they are created at both workers

View File

@ -71,34 +71,27 @@ CREATE TABLE table1_group1 ( id int );
SELECT master_create_distributed_table('table1_group1', 'id', 'hash'); SELECT master_create_distributed_table('table1_group1', 'id', 'hash');
SELECT master_create_worker_shards('table1_group1', 4, 2); SELECT master_create_worker_shards('table1_group1', 4, 2);
select last_value from pg_dist_colocationid_seq ;
CREATE TABLE table2_group1 ( id int ); CREATE TABLE table2_group1 ( id int );
SELECT master_create_distributed_table('table2_group1', 'id', 'hash'); SELECT master_create_distributed_table('table2_group1', 'id', 'hash');
SELECT master_create_worker_shards('table2_group1', 4, 2); SELECT master_create_worker_shards('table2_group1', 4, 2);
table pg_dist_colocation;
table pg_dist_partition;
CREATE TABLE table3_group2 ( id int ); CREATE TABLE table3_group2 ( id int );
SELECT master_create_distributed_table('table3_group2', 'id', 'hash'); SELECT master_create_distributed_table('table3_group2', 'id', 'hash');
SELECT master_create_worker_shards('table3_group2', 4, 2); SELECT master_create_worker_shards('table3_group2', 4, 2);
select last_value from pg_dist_colocationid_seq ;
CREATE TABLE table4_group2 ( id int ); CREATE TABLE table4_group2 ( id int );
SELECT master_create_distributed_table('table4_group2', 'id', 'hash'); SELECT master_create_distributed_table('table4_group2', 'id', 'hash');
SELECT master_create_worker_shards('table4_group2', 4, 2); SELECT master_create_worker_shards('table4_group2', 4, 2);
select last_value from pg_dist_colocationid_seq ;
CREATE TABLE table5_groupX ( id int ); CREATE TABLE table5_groupX ( id int );
SELECT master_create_distributed_table('table5_groupX', 'id', 'hash'); SELECT master_create_distributed_table('table5_groupX', 'id', 'hash');
SELECT master_create_worker_shards('table5_groupX', 4, 2); SELECT master_create_worker_shards('table5_groupX', 4, 2);
table pg_dist_colocation;
table pg_dist_partition;
CREATE TABLE table6_append ( id int ); CREATE TABLE table6_append ( id int );
SELECT master_create_distributed_table('table6_append', 'id', 'append'); SELECT master_create_distributed_table('table6_append', 'id', 'append');
SELECT master_create_empty_shard('table6_append');select last_value from pg_dist_colocationid_seq ; SELECT master_create_empty_shard('table6_append');
SELECT master_create_empty_shard('table6_append');select last_value from pg_dist_colocationid_seq ;
SELECT master_create_empty_shard('table6_append');
-- make table1_group1 and table2_group1 co-located manually -- make table1_group1 and table2_group1 co-located manually
@ -108,8 +101,6 @@ SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1');
SELECT get_table_colocation_id('table1_group1'); SELECT get_table_colocation_id('table1_group1');
SELECT get_table_colocation_id('table5_groupX'); SELECT get_table_colocation_id('table5_groupX');
SELECT get_table_colocation_id('table6_append'); SELECT get_table_colocation_id('table6_append');
table pg_dist_colocation;
table pg_dist_partition;
-- check self table co-location -- check self table co-location
SELECT tables_colocated('table1_group1', 'table1_group1'); SELECT tables_colocated('table1_group1', 'table1_group1');
SELECT tables_colocated('table5_groupX', 'table5_groupX'); SELECT tables_colocated('table5_groupX', 'table5_groupX');
@ -124,8 +115,6 @@ SELECT tables_colocated('table1_group1', 'table3_group2');
-- check table co-location with invalid co-location group -- check table co-location with invalid co-location group
SELECT tables_colocated('table1_group1', 'table5_groupX'); SELECT tables_colocated('table1_group1', 'table5_groupX');
SELECT tables_colocated('table1_group1', 'table6_append'); SELECT tables_colocated('table1_group1', 'table6_append');
table pg_dist_colocation;
table pg_dist_partition;
-- check self shard co-location -- check self shard co-location
SELECT shards_colocated(1300000, 1300000); SELECT shards_colocated(1300000, 1300000);
SELECT shards_colocated(1300016, 1300016); SELECT shards_colocated(1300016, 1300016);
@ -139,8 +128,7 @@ SELECT shards_colocated(1300000, 1300001);
-- check shard co-location with different co-location group -- check shard co-location with different co-location group
SELECT shards_colocated(1300000, 1300005); SELECT shards_colocated(1300000, 1300005);
table pg_dist_colocation;
table pg_dist_partition;
-- check shard co-location with invalid co-location group -- check shard co-location with invalid co-location group
SELECT shards_colocated(1300000, 1300016); SELECT shards_colocated(1300000, 1300016);
SELECT shards_colocated(1300000, 1300020); SELECT shards_colocated(1300000, 1300020);
@ -161,8 +149,6 @@ SELECT find_shard_interval_index(1300001);
SELECT find_shard_interval_index(1300002); SELECT find_shard_interval_index(1300002);
SELECT find_shard_interval_index(1300003); SELECT find_shard_interval_index(1300003);
SELECT find_shard_interval_index(1300016); SELECT find_shard_interval_index(1300016);
table pg_dist_colocation;
table pg_dist_partition;
-- check external colocation API -- check external colocation API
SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5); SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5);
@ -192,8 +178,6 @@ UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='table2_groupB'::re
SET citus.shard_replication_factor to DEFAULT; SET citus.shard_replication_factor to DEFAULT;
-- change partition column type -- change partition column type
table pg_dist_colocation;
table pg_dist_partition;
set citus.log_remote_commands to true; set citus.log_remote_commands to true;
CREATE TABLE table1_groupC ( id text ); CREATE TABLE table1_groupC ( id text );
SELECT create_distributed_table('table1_groupC', 'id'); SELECT create_distributed_table('table1_groupC', 'id');
@ -570,3 +554,7 @@ DROP TABLE none;
DROP TABLE ref; DROP TABLE ref;
DROP TABLE local_table; DROP TABLE local_table;
DROP FOREIGN TABLE table3_groupD CASCADE; DROP FOREIGN TABLE table3_groupD CASCADE;
-- Resync remote nodes as well
SELECT citus_activate_node('localhost', :worker_1_port);
SELECT citus_activate_node('localhost', :worker_2_port);

View File

@ -6,10 +6,6 @@
-- metadata changes to MX tables. -- metadata changes to MX tables.
-- Turn metadata sync off at first -- Turn metadata sync off at first
\c - - - :worker_1_port
table pg_dist_partition;
table pg_dist_node;
\c - - - :master_port \c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
@ -105,27 +101,14 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888;
-- Add a node to another cluster to make sure it's also synced -- Add a node to another cluster to make sure it's also synced
SELECT master_add_secondary_node('localhost', 8889, 'localhost', :worker_1_port, nodecluster => 'second-cluster'); SELECT master_add_secondary_node('localhost', 8889, 'localhost', :worker_1_port, nodecluster => 'second-cluster');
\c - - - :worker_1_port
table pg_dist_partition;
table pg_dist_node;
table pg_dist_shard;
table pg_dist_shard_placement;
\c - - - :master_port \c - - - :master_port
-- Run start_metadata_sync_to_node and citus_activate_node and check that it marked hasmetadata for that worker -- Run start_metadata_sync_to_node and citus_activate_node and check that it marked hasmetadata for that worker
table pg_dist_partition;
\d
set citus.log_remote_commands to true;
set citus.worker_min_messages to debug5;
SELECT citus_activate_node('localhost', :worker_1_port); SELECT citus_activate_node('localhost', :worker_1_port);
reset citus.log_remote_commands;
reset citus.worker_min_messages;
SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port; SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port;
-- Check that the metadata has been copied to the worker -- Check that the metadata has been copied to the worker
\c - - - :worker_1_port \c - - - :worker_1_port
table pg_dist_partition;
SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_local_group;
SELECT * FROM pg_dist_node ORDER BY nodeid; SELECT * FROM pg_dist_node ORDER BY nodeid;
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
@ -189,12 +172,12 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass;
-- Make sure that citus_activate_node can be called inside a transaction and rollbacked -- Make sure that citus_activate_node can be called inside a transaction and rollbacked
--\c - - - :master_port \c - - - :master_port
--BEGIN; BEGIN;
--SELECT citus_activate_node('localhost', :worker_2_port); SELECT citus_activate_node('localhost', :worker_2_port);
--ROLLBACK; ROLLBACK;
--SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
-- Check that the distributed table can be queried from the worker -- Check that the distributed table can be queried from the worker
\c - - - :master_port \c - - - :master_port