From 26e308bf2ad0a84356b18ca1200b0f89a2172377 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 20 Aug 2018 19:55:04 +0300 Subject: [PATCH] Support TRUNCATE from the MX worker nodes This commit enables support for TRUNCATE on both distributed table and reference tables. The basic idea is to acquire lock on the relation by sending the TRUNCATE command to all metedata worker nodes. We only skip sending the TRUNCATE command to the node that actually executus the command to prevent a self-distributed-deadlock. --- .../master/master_modify_multiple_shards.c | 33 ++++- .../transaction/worker_transaction.c | 8 +- src/include/distributed/worker_transaction.h | 2 +- .../multi_mx_truncate_from_worker.out | 136 ++++++++++++++++++ .../multi_unsupported_worker_operations.out | 10 -- src/test/regress/multi_mx_schedule | 2 +- .../sql/multi_mx_truncate_from_worker.sql | 100 +++++++++++++ .../multi_unsupported_worker_operations.sql | 3 - 8 files changed, 274 insertions(+), 20 deletions(-) create mode 100644 src/test/regress/expected/multi_mx_truncate_from_worker.out create mode 100644 src/test/regress/sql/multi_mx_truncate_from_worker.sql diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index dcf43eb03..3c1b8fea8 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -31,6 +31,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_executor.h" @@ -43,6 +44,7 @@ #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "optimizer/clauses.h" #include "optimizer/predtest.h" #include "optimizer/restrictinfo.h" @@ -89,6 +91,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) int32 affectedTupleCount = 0; CmdType operation = CMD_UNKNOWN; TaskType taskType = TASK_TYPE_INVALID_FIRST; + bool truncateOperation = false; #if (PG_VERSION_NUM >= 100000) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); queryTreeNode = rawStmt->stmt; @@ -96,9 +99,13 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) queryTreeNode = ParseTreeNode(queryString); #endif - EnsureCoordinator(); CheckCitusVersion(ERROR); + truncateOperation = IsA(queryTreeNode, TruncateStmt); + if (!truncateOperation) + { + EnsureCoordinator(); + } if (IsA(queryTreeNode, DeleteStmt)) { @@ -196,6 +203,30 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); + /* + * We should execute "TRUNCATE table_name;" on the other worker nodes before + * executing the truncate commands on the shards. This is necessary to prevent + * distributed deadlocks where a concurrent operation on the same table (or a + * cascading table) is executed on the other nodes. + * + * Note that we should skip the current node to prevent a self-deadlock. + */ + if (truncateOperation && ShouldSyncTableMetadata(relationId)) + { + SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, + DISABLE_DDL_PROPAGATION); + + + /* + * Note that here we ignore the schema and send the queryString as is + * since citus_truncate_trigger already uses qualified table name. + * If that was not the case, we should also had to set the search path + * as we do for regular DDLs. + */ + SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, + queryString); + } + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { affectedTupleCount = diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index cbe146e08..4e4db59d2 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -118,13 +118,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) int connectionFlags = FORCE_NEW_CONNECTION; if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER) && + targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER && + if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && workerNode->groupId == GetLocalGroupId()) { continue; @@ -178,13 +178,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, int connectionFlags = 0; if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER) && + targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER && + if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && workerNode->groupId == GetLocalGroupId()) { continue; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a41b1326f..048e684cc 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -22,7 +22,7 @@ typedef enum TargetWorkerSet { WORKERS_WITH_METADATA, - WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER, + OTHER_WORKERS_WITH_METADATA, ALL_WORKERS } TargetWorkerSet; diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out new file mode 100644 index 000000000..0d3b75bc4 --- /dev/null +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -0,0 +1,136 @@ +CREATE SCHEMA truncate_from_workers; +SET search_path TO 'truncate_from_workers'; +SET citus.next_shard_id TO 2380000; +SET citus.next_placement_id TO 2380000; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; +SET citus.replication_model TO streaming; +CREATE TABLE referece_table(id int PRIMARY KEY); +SELECT create_reference_table('referece_table'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int); +SELECT create_distributed_table('on_update_fkey_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE; +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- first, make sure that truncate from the coordinator workers as expected +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +-- fill the table again +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- now, show that TRUNCATE CASCADE works expected from the coordinator +TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +SELECT count(*) FROM referece_table; + count +------- + 0 +(1 row) + +-- load some data for the next tests +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- make sure that DDLs along with TRUNCATE worker fine +BEGIN; + ALTER TABLE on_update_fkey_table ADD COLUMN x INT; + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +ROLLBACK; +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; +-- make sure that TRUNCATE workes expected from the worker node +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +-- load some data +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; +-- now, show that TRUNCATE CASCADE works expected from the worker +TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +SELECT count(*) FROM referece_table; + count +------- + 0 +(1 row) + +-- test within transaction blocks +BEGIN; + TRUNCATE on_update_fkey_table; +ROLLBACK; +-- test within transaction blocks +BEGIN; + TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +ROLLBACK; +-- test with sequential mode and CASCADE +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO sequential; + TRUNCATE on_update_fkey_table; + TRUNCATE referece_table CASCADE; +NOTICE: truncate cascades to table "on_update_fkey_table" +ROLLBACK; +-- fill some data for the next test +\c - - - :master_port +SET search_path TO 'truncate_from_workers'; +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; +-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine +BEGIN; + INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + SELECT count(*) FROM on_update_fkey_table; + count +------- + 1001 +(1 row) + + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; + count +------- + 0 +(1 row) + +ROLLBACK; +RESET client_min_messages; +\c - - - :master_port +DROP SCHEMA truncate_from_workers CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table truncate_from_workers.referece_table +drop cascades to table truncate_from_workers.on_update_fkey_table +SET search_path TO public; diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index d46760e8a..50bc7713b 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -220,17 +220,7 @@ SELECT master_remove_node('localhost', 5432); (1 row) --- TRUNCATE \c - - - :worker_1_port -TRUNCATE mx_table; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -SELECT count(*) FROM mx_table; - count -------- - 5 -(1 row) - -- mark_tables_colocated UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 3186c2ed4..c495afe3e 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -27,7 +27,7 @@ test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 test: multi_mx_tpch_query7_nested multi_mx_ddl -test: recursive_dml_queries_mx +test: recursive_dml_queries_mx multi_mx_truncate_from_worker test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_metadata diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql new file mode 100644 index 000000000..00bdd13e6 --- /dev/null +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -0,0 +1,100 @@ +CREATE SCHEMA truncate_from_workers; +SET search_path TO 'truncate_from_workers'; + +SET citus.next_shard_id TO 2380000; +SET citus.next_placement_id TO 2380000; + +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; +SET citus.replication_model TO streaming; + +CREATE TABLE referece_table(id int PRIMARY KEY); +SELECT create_reference_table('referece_table'); + +CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int); +SELECT create_distributed_table('on_update_fkey_table', 'id'); + +ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE; + +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- first, make sure that truncate from the coordinator workers as expected +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + +-- fill the table again +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- now, show that TRUNCATE CASCADE works expected from the coordinator +TRUNCATE referece_table CASCADE; +SELECT count(*) FROM on_update_fkey_table; +SELECT count(*) FROM referece_table; + +-- load some data for the next tests +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- make sure that DDLs along with TRUNCATE worker fine +BEGIN; + ALTER TABLE on_update_fkey_table ADD COLUMN x INT; + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; +ROLLBACK; + + +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; + +-- make sure that TRUNCATE workes expected from the worker node +TRUNCATE on_update_fkey_table; +SELECT count(*) FROM on_update_fkey_table; + +-- load some data +INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + +-- now, show that TRUNCATE CASCADE works expected from the worker +TRUNCATE referece_table CASCADE; +SELECT count(*) FROM on_update_fkey_table; +SELECT count(*) FROM referece_table; + +-- test within transaction blocks +BEGIN; + TRUNCATE on_update_fkey_table; +ROLLBACK; + +-- test within transaction blocks +BEGIN; + TRUNCATE referece_table CASCADE; +ROLLBACK; + +-- test with sequential mode and CASCADE +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO sequential; + TRUNCATE on_update_fkey_table; + TRUNCATE referece_table CASCADE; +ROLLBACK; + +-- fill some data for the next test +\c - - - :master_port +SET search_path TO 'truncate_from_workers'; +INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i; + +\c - - - :worker_1_port +SET search_path TO 'truncate_from_workers'; + +-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine +BEGIN; + INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; + SELECT count(*) FROM on_update_fkey_table; + TRUNCATE on_update_fkey_table; + SELECT count(*) FROM on_update_fkey_table; +ROLLBACK; + +RESET client_min_messages; + +\c - - - :master_port + +DROP SCHEMA truncate_from_workers CASCADE; + +SET search_path TO public; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 83ffc1808..a58b489d3 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -124,10 +124,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; \c - - - :master_port SELECT master_remove_node('localhost', 5432); --- TRUNCATE \c - - - :worker_1_port -TRUNCATE mx_table; -SELECT count(*) FROM mx_table; -- mark_tables_colocated UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;