diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index dcf43eb03..3c1b8fea8 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -31,6 +31,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_executor.h" @@ -43,6 +44,7 @@ #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "optimizer/clauses.h" #include "optimizer/predtest.h" #include "optimizer/restrictinfo.h" @@ -89,6 +91,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) int32 affectedTupleCount = 0; CmdType operation = CMD_UNKNOWN; TaskType taskType = TASK_TYPE_INVALID_FIRST; + bool truncateOperation = false; #if (PG_VERSION_NUM >= 100000) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); queryTreeNode = rawStmt->stmt; @@ -96,9 +99,13 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) queryTreeNode = ParseTreeNode(queryString); #endif - EnsureCoordinator(); CheckCitusVersion(ERROR); + truncateOperation = IsA(queryTreeNode, TruncateStmt); + if (!truncateOperation) + { + EnsureCoordinator(); + } if (IsA(queryTreeNode, DeleteStmt)) { @@ -196,6 +203,30 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); + /* + * We should execute "TRUNCATE table_name;" on the other worker nodes before + * executing the truncate commands on the shards. This is necessary to prevent + * distributed deadlocks where a concurrent operation on the same table (or a + * cascading table) is executed on the other nodes. + * + * Note that we should skip the current node to prevent a self-deadlock. + */ + if (truncateOperation && ShouldSyncTableMetadata(relationId)) + { + SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, + DISABLE_DDL_PROPAGATION); + + + /* + * Note that here we ignore the schema and send the queryString as is + * since citus_truncate_trigger already uses qualified table name. + * If that was not the case, we should also had to set the search path + * as we do for regular DDLs. + */ + SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, + queryString); + } + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { affectedTupleCount = diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index cbe146e08..4e4db59d2 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -118,13 +118,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) int connectionFlags = FORCE_NEW_CONNECTION; if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER) && + targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER && + if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && workerNode->groupId == GetLocalGroupId()) { continue; @@ -178,13 +178,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, int connectionFlags = 0; if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER) && + targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER && + if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && workerNode->groupId == GetLocalGroupId()) { continue; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a41b1326f..048e684cc 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -22,7 +22,7 @@ typedef enum TargetWorkerSet { WORKERS_WITH_METADATA, - WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER, + OTHER_WORKERS_WITH_METADATA, ALL_WORKERS } TargetWorkerSet; diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out new file mode 100644 index 000000000..0d3b75bc4 --- /dev/null +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -0,0 +1,136 @@ +CREATE SCHEMA truncate_from_workers; +SET search_path TO 'truncate_from_workers'; +SET citus.next_shard_id TO 2380000; +SET citus.next_placement_id TO 2380000; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; +SET citus.replication_model TO streaming; +CREATE TABLE referece_table(id int PRIMARY KEY); +SELECT create_reference_table('referece_table'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int); +SELECT create_distributed_table('on_update_fkey_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE; +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- first, make sure that truncate from the coordinator workers as expected +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +-- fill the table again +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- now, show that TRUNCATE CASCADE works expected from the coordinator +TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +SELECT count(*) FROM referece_table; + count +------- + 0 +(1 row) + +-- load some data for the next tests +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- make sure that DDLs along with TRUNCATE worker fine +BEGIN; + ALTER TABLE on_update_fkey_table ADD COLUMN x INT; + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +ROLLBACK; +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; +-- make sure that TRUNCATE workes expected from the worker node +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +-- load some data +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- now, show that TRUNCATE CASCADE works expected from the worker +TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +SELECT count(*) FROM referece_table; + count +------- + 0 +(1 row) + +-- test within transaction blocks +BEGIN; + TRUNCATE on_update_fkey_table; +ROLLBACK; +-- test within transaction blocks +BEGIN; + TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +ROLLBACK; +-- test with sequential mode and CASCADE +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO sequential; + TRUNCATE on_update_fkey_table; + TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +ROLLBACK; +-- fill some data for the next test +\c - - - :master_port +SET search_path TO 'truncate_from_workers'; +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; +-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine +BEGIN; + INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + SELECT count(*) FROM on_update_fkey_table; + count +------- + 1001 +(1 row) + + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +ROLLBACK; +RESET client_min_messages; +\c - - - :master_port +DROP SCHEMA truncate_from_workers CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table truncate_from_workers.referece_table +drop cascades to table truncate_from_workers.on_update_fkey_table +SET search_path TO public; diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index d46760e8a..50bc7713b 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -220,17 +220,7 @@ SELECT master_remove_node('localhost', 5432); (1 row) --- TRUNCATE \c - - - :worker_1_port -TRUNCATE mx_table; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -SELECT count(*) FROM mx_table; - count -------- - 5 -(1 row) - -- mark_tables_colocated UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 3186c2ed4..c495afe3e 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -27,7 +27,7 @@ test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 test: multi_mx_tpch_query7_nested multi_mx_ddl -test: recursive_dml_queries_mx +test: recursive_dml_queries_mx multi_mx_truncate_from_worker test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_metadata diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql new file mode 100644 index 000000000..00bdd13e6 --- /dev/null +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -0,0 +1,100 @@ +CREATE SCHEMA truncate_from_workers; +SET search_path TO 'truncate_from_workers'; + +SET citus.next_shard_id TO 2380000; +SET citus.next_placement_id TO 2380000; + +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; +SET citus.replication_model TO streaming; + +CREATE TABLE referece_table(id int PRIMARY KEY); +SELECT create_reference_table('referece_table'); + +CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int); +SELECT create_distributed_table('on_update_fkey_table', 'id'); + +ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE; + +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- first, make sure that truncate from the coordinator workers as expected +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + +-- fill the table again +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- now, show that TRUNCATE CASCADE works expected from the coordinator +TRUNCATE referece_table CASCADE; +SELECT count(*) FROM on_update_fkey_table; +SELECT count(*) FROM referece_table; + +-- load some data for the next tests +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- make sure that DDLs along with TRUNCATE worker fine +BEGIN; + ALTER TABLE on_update_fkey_table ADD COLUMN x INT; + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; +ROLLBACK; + + +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; + +-- make sure that TRUNCATE workes expected from the worker node +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + +-- load some data +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- now, show that TRUNCATE CASCADE works expected from the worker +TRUNCATE referece_table CASCADE; +SELECT count(*) FROM on_update_fkey_table; +SELECT count(*) FROM referece_table; + +-- test within transaction blocks +BEGIN; + TRUNCATE on_update_fkey_table; +ROLLBACK; + +-- test within transaction blocks +BEGIN; + TRUNCATE referece_table CASCADE; +ROLLBACK; + +-- test with sequential mode and CASCADE +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO sequential; + TRUNCATE on_update_fkey_table; + TRUNCATE referece_table CASCADE; +ROLLBACK; + +-- fill some data for the next test +\c - - - :master_port +SET search_path TO 'truncate_from_workers'; +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; + +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; + +-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine +BEGIN; + INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + SELECT count(*) FROM on_update_fkey_table; + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; +ROLLBACK; + +RESET client_min_messages; + +\c - - - :master_port + +DROP SCHEMA truncate_from_workers CASCADE; + +SET search_path TO public; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 83ffc1808..a58b489d3 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -124,10 +124,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; \c - - - :master_port SELECT master_remove_node('localhost', 5432); --- TRUNCATE \c - - - :worker_1_port -TRUNCATE mx_table; -SELECT count(*) FROM mx_table; -- mark_tables_colocated UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;