Support TRUNCATE from the MX worker nodes

This commit enables support for TRUNCATE on both
distributed table and reference tables.

The basic idea is to acquire lock on the relation by sending
the TRUNCATE command to all metedata worker nodes. We only
skip sending the TRUNCATE command to the node that actually
executus the command to prevent a self-distributed-deadlock.
pull/2345/head
Onder Kalaci 2018-08-20 19:55:04 +03:00
parent 97ba7bf2eb
commit 26e308bf2a
8 changed files with 274 additions and 20 deletions

View File

@ -31,6 +31,7 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
@ -43,6 +44,7 @@
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/predtest.h" #include "optimizer/predtest.h"
#include "optimizer/restrictinfo.h" #include "optimizer/restrictinfo.h"
@ -89,6 +91,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
int32 affectedTupleCount = 0; int32 affectedTupleCount = 0;
CmdType operation = CMD_UNKNOWN; CmdType operation = CMD_UNKNOWN;
TaskType taskType = TASK_TYPE_INVALID_FIRST; TaskType taskType = TASK_TYPE_INVALID_FIRST;
bool truncateOperation = false;
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
queryTreeNode = rawStmt->stmt; queryTreeNode = rawStmt->stmt;
@ -96,9 +99,13 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
queryTreeNode = ParseTreeNode(queryString); queryTreeNode = ParseTreeNode(queryString);
#endif #endif
EnsureCoordinator();
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
truncateOperation = IsA(queryTreeNode, TruncateStmt);
if (!truncateOperation)
{
EnsureCoordinator();
}
if (IsA(queryTreeNode, DeleteStmt)) if (IsA(queryTreeNode, DeleteStmt))
{ {
@ -196,6 +203,30 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
taskList = taskList =
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
/*
* We should execute "TRUNCATE table_name;" on the other worker nodes before
* executing the truncate commands on the shards. This is necessary to prevent
* distributed deadlocks where a concurrent operation on the same table (or a
* cascading table) is executed on the other nodes.
*
* Note that we should skip the current node to prevent a self-deadlock.
*/
if (truncateOperation && ShouldSyncTableMetadata(relationId))
{
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
DISABLE_DDL_PROPAGATION);
/*
* Note that here we ignore the schema and send the queryString as is
* since citus_truncate_trigger already uses qualified table name.
* If that was not the case, we should also had to set the search path
* as we do for regular DDLs.
*/
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
queryString);
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
affectedTupleCount = affectedTupleCount =

View File

@ -118,13 +118,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
if ((targetWorkerSet == WORKERS_WITH_METADATA || if ((targetWorkerSet == WORKERS_WITH_METADATA ||
targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER) && targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
!workerNode->hasMetadata) !workerNode->hasMetadata)
{ {
continue; continue;
} }
if (targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER && if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
workerNode->groupId == GetLocalGroupId()) workerNode->groupId == GetLocalGroupId())
{ {
continue; continue;
@ -178,13 +178,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
int connectionFlags = 0; int connectionFlags = 0;
if ((targetWorkerSet == WORKERS_WITH_METADATA || if ((targetWorkerSet == WORKERS_WITH_METADATA ||
targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER) && targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
!workerNode->hasMetadata) !workerNode->hasMetadata)
{ {
continue; continue;
} }
if (targetWorkerSet == WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER && if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
workerNode->groupId == GetLocalGroupId()) workerNode->groupId == GetLocalGroupId())
{ {
continue; continue;

View File

@ -22,7 +22,7 @@
typedef enum TargetWorkerSet typedef enum TargetWorkerSet
{ {
WORKERS_WITH_METADATA, WORKERS_WITH_METADATA,
WORKERS_WITH_METADATA_EXCLUDE_CURRENT_WORKER, OTHER_WORKERS_WITH_METADATA,
ALL_WORKERS ALL_WORKERS
} TargetWorkerSet; } TargetWorkerSet;

View File

@ -0,0 +1,136 @@
CREATE SCHEMA truncate_from_workers;
SET search_path TO 'truncate_from_workers';
SET citus.next_shard_id TO 2380000;
SET citus.next_placement_id TO 2380000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 6;
SET citus.replication_model TO streaming;
CREATE TABLE referece_table(id int PRIMARY KEY);
SELECT create_reference_table('referece_table');
create_reference_table
------------------------
(1 row)
CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('on_update_fkey_table', 'id');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE;
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
-- fill the table again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
SELECT count(*) FROM referece_table;
count
-------
0
(1 row)
-- load some data for the next tests
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- make sure that DDLs along with TRUNCATE worker fine
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN x INT;
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
SELECT count(*) FROM referece_table;
count
-------
0
(1 row)
-- test within transaction blocks
BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE on_update_fkey_table;
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
BEGIN;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM on_update_fkey_table;
count
-------
1001
(1 row)
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
DROP SCHEMA truncate_from_workers CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table truncate_from_workers.referece_table
drop cascades to table truncate_from_workers.on_update_fkey_table
SET search_path TO public;

View File

@ -220,17 +220,7 @@ SELECT master_remove_node('localhost', 5432);
(1 row) (1 row)
-- TRUNCATE
\c - - - :worker_1_port \c - - - :worker_1_port
TRUNCATE mx_table;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
SELECT count(*) FROM mx_table;
count
-------
5
(1 row)
-- mark_tables_colocated -- mark_tables_colocated
UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;
SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']);

View File

@ -27,7 +27,7 @@ test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10
test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19
test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7
test: multi_mx_tpch_query7_nested multi_mx_ddl test: multi_mx_tpch_query7_nested multi_mx_ddl
test: recursive_dml_queries_mx test: recursive_dml_queries_mx multi_mx_truncate_from_worker
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata test: multi_mx_metadata

View File

@ -0,0 +1,100 @@
CREATE SCHEMA truncate_from_workers;
SET search_path TO 'truncate_from_workers';
SET citus.next_shard_id TO 2380000;
SET citus.next_placement_id TO 2380000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 6;
SET citus.replication_model TO streaming;
CREATE TABLE referece_table(id int PRIMARY KEY);
SELECT create_reference_table('referece_table');
CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('on_update_fkey_table', 'id');
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE;
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- fill the table again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE referece_table CASCADE;
SELECT count(*) FROM on_update_fkey_table;
SELECT count(*) FROM referece_table;
-- load some data for the next tests
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- make sure that DDLs along with TRUNCATE worker fine
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN x INT;
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE referece_table CASCADE;
SELECT count(*) FROM on_update_fkey_table;
SELECT count(*) FROM referece_table;
-- test within transaction blocks
BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE referece_table CASCADE;
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE on_update_fkey_table;
TRUNCATE referece_table CASCADE;
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
BEGIN;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM on_update_fkey_table;
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
DROP SCHEMA truncate_from_workers CASCADE;
SET search_path TO public;

View File

@ -124,10 +124,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
\c - - - :master_port \c - - - :master_port
SELECT master_remove_node('localhost', 5432); SELECT master_remove_node('localhost', 5432);
-- TRUNCATE
\c - - - :worker_1_port \c - - - :worker_1_port
TRUNCATE mx_table;
SELECT count(*) FROM mx_table;
-- mark_tables_colocated -- mark_tables_colocated
UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;