Merge pull request #2345 from citusdata/truncate_on_workers

Support TRUNCATE from the MX worker nodes
pull/2343/head
Önder Kalacı 2018-09-03 15:29:14 +03:00 committed by GitHub
commit 3ace0ad5eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 329 additions and 20 deletions

View File

@ -31,6 +31,7 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
@ -43,6 +44,7 @@
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/predtest.h" #include "optimizer/predtest.h"
#include "optimizer/restrictinfo.h" #include "optimizer/restrictinfo.h"
@ -89,6 +91,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
int32 affectedTupleCount = 0; int32 affectedTupleCount = 0;
CmdType operation = CMD_UNKNOWN; CmdType operation = CMD_UNKNOWN;
TaskType taskType = TASK_TYPE_INVALID_FIRST; TaskType taskType = TASK_TYPE_INVALID_FIRST;
bool truncateOperation = false;
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
queryTreeNode = rawStmt->stmt; queryTreeNode = rawStmt->stmt;
@ -96,9 +99,13 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
queryTreeNode = ParseTreeNode(queryString); queryTreeNode = ParseTreeNode(queryString);
#endif #endif
EnsureCoordinator();
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
truncateOperation = IsA(queryTreeNode, TruncateStmt);
if (!truncateOperation)
{
EnsureCoordinator();
}
if (IsA(queryTreeNode, DeleteStmt)) if (IsA(queryTreeNode, DeleteStmt))
{ {
@ -196,6 +203,30 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
taskList = taskList =
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
/*
* We should execute "TRUNCATE table_name;" on the other worker nodes before
* executing the truncate commands on the shards. This is necessary to prevent
* distributed deadlocks where a concurrent operation on the same table (or a
* cascading table) is executed on the other nodes.
*
* Note that we should skip the current node to prevent a self-deadlock.
*/
if (truncateOperation && ShouldSyncTableMetadata(relationId))
{
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
DISABLE_DDL_PROPAGATION);
/*
* Note that here we ignore the schema and send the queryString as is
* since citus_truncate_trigger already uses qualified table name.
* If that was not the case, we should also had to set the search path
* as we do for regular DDLs.
*/
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
queryString);
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
affectedTupleCount = affectedTupleCount =

View File

@ -117,7 +117,15 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) if ((targetWorkerSet == WORKERS_WITH_METADATA ||
targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
!workerNode->hasMetadata)
{
continue;
}
if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
workerNode->groupId == GetLocalGroupId())
{ {
continue; continue;
} }
@ -169,7 +177,15 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
int connectionFlags = 0; int connectionFlags = 0;
if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) if ((targetWorkerSet == WORKERS_WITH_METADATA ||
targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
!workerNode->hasMetadata)
{
continue;
}
if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
workerNode->groupId == GetLocalGroupId())
{ {
continue; continue;
} }

View File

@ -22,6 +22,7 @@
typedef enum TargetWorkerSet typedef enum TargetWorkerSet
{ {
WORKERS_WITH_METADATA, WORKERS_WITH_METADATA,
OTHER_WORKERS_WITH_METADATA,
ALL_WORKERS ALL_WORKERS
} TargetWorkerSet; } TargetWorkerSet;

View File

@ -295,6 +295,7 @@ DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: common table expressions are not supported in distributed modifications DEBUG: common table expressions are not supported in distributed modifications
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
SET citus.explain_all_tasks TO ON;
EXPLAIN (COSTS FALSE) WITH cte_1 AS ( EXPLAIN (COSTS FALSE) WITH cte_1 AS (
WITH cte_2 AS ( WITH cte_2 AS (
SELECT tenant_id as cte2_id SELECT tenant_id as cte2_id
@ -320,13 +321,25 @@ DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_
-> Distributed Subplan 22_1 -> Distributed Subplan 22_1
-> Custom Scan (Citus Router) -> Custom Scan (Citus Router)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: All
-> Task -> Task
Node: host=localhost port=57638 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Update on distributed_table_2370000 distributed_table -> Update on distributed_table_2370000 distributed_table
-> Seq Scan on distributed_table_2370000 distributed_table -> Seq Scan on distributed_table_2370000 distributed_table
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on distributed_table_2370001 distributed_table
-> Seq Scan on distributed_table_2370001 distributed_table
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on distributed_table_2370002 distributed_table
-> Seq Scan on distributed_table_2370002 distributed_table
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on distributed_table_2370003 distributed_table
-> Seq Scan on distributed_table_2370003 distributed_table
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: All
-> Task -> Task
Node: host=localhost port=57638 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Update on distributed_table_2370000 distributed_table -> Update on distributed_table_2370000 distributed_table
@ -335,7 +348,31 @@ DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_
-> Function Scan on read_intermediate_result intermediate_result -> Function Scan on read_intermediate_result intermediate_result
-> Materialize -> Materialize
-> Seq Scan on distributed_table_2370000 distributed_table -> Seq Scan on distributed_table_2370000 distributed_table
(19 rows) -> Task
Node: host=localhost port=57637 dbname=regression
-> Update on distributed_table_2370001 distributed_table
-> Nested Loop
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
-> Function Scan on read_intermediate_result intermediate_result
-> Materialize
-> Seq Scan on distributed_table_2370001 distributed_table
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on distributed_table_2370002 distributed_table
-> Nested Loop
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
-> Function Scan on read_intermediate_result intermediate_result
-> Materialize
-> Seq Scan on distributed_table_2370002 distributed_table
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on distributed_table_2370003 distributed_table
-> Nested Loop
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
-> Function Scan on read_intermediate_result intermediate_result
-> Materialize
-> Seq Scan on distributed_table_2370003 distributed_table
(55 rows)
-- we don't support updating local table with a join with -- we don't support updating local table with a join with
-- distributed tables -- distributed tables

View File

@ -0,0 +1,136 @@
CREATE SCHEMA truncate_from_workers;
SET search_path TO 'truncate_from_workers';
SET citus.next_shard_id TO 2380000;
SET citus.next_placement_id TO 2380000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 6;
SET citus.replication_model TO streaming;
CREATE TABLE referece_table(id int PRIMARY KEY);
SELECT create_reference_table('referece_table');
create_reference_table
------------------------
(1 row)
CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('on_update_fkey_table', 'id');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE;
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
-- fill the table again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
SELECT count(*) FROM referece_table;
count
-------
0
(1 row)
-- load some data for the next tests
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- make sure that DDLs along with TRUNCATE worker fine
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN x INT;
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
SELECT count(*) FROM referece_table;
count
-------
0
(1 row)
-- test within transaction blocks
BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE on_update_fkey_table;
TRUNCATE referece_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
BEGIN;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM on_update_fkey_table;
count
-------
1001
(1 row)
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
count
-------
0
(1 row)
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
DROP SCHEMA truncate_from_workers CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table truncate_from_workers.referece_table
drop cascades to table truncate_from_workers.on_update_fkey_table
SET search_path TO public;

View File

@ -220,17 +220,7 @@ SELECT master_remove_node('localhost', 5432);
(1 row) (1 row)
-- TRUNCATE
\c - - - :worker_1_port \c - - - :worker_1_port
TRUNCATE mx_table;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
SELECT count(*) FROM mx_table;
count
-------
5
(1 row)
-- mark_tables_colocated -- mark_tables_colocated
UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;
SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']);

View File

@ -27,7 +27,7 @@ test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10
test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19
test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7
test: multi_mx_tpch_query7_nested multi_mx_ddl test: multi_mx_tpch_query7_nested multi_mx_ddl
test: recursive_dml_queries_mx test: recursive_dml_queries_mx multi_mx_truncate_from_worker
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata test: multi_mx_metadata

View File

@ -234,6 +234,7 @@ SET dept = 5
FROM cte_1 FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id; WHERE distributed_table.tenant_id < cte_1.tenant_id;
SET citus.explain_all_tasks TO ON;
EXPLAIN (COSTS FALSE) WITH cte_1 AS ( EXPLAIN (COSTS FALSE) WITH cte_1 AS (
WITH cte_2 AS ( WITH cte_2 AS (
SELECT tenant_id as cte2_id SELECT tenant_id as cte2_id

View File

@ -0,0 +1,100 @@
CREATE SCHEMA truncate_from_workers;
SET search_path TO 'truncate_from_workers';
SET citus.next_shard_id TO 2380000;
SET citus.next_placement_id TO 2380000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 6;
SET citus.replication_model TO streaming;
CREATE TABLE referece_table(id int PRIMARY KEY);
SELECT create_reference_table('referece_table');
CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('on_update_fkey_table', 'id');
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE;
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- fill the table again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE referece_table CASCADE;
SELECT count(*) FROM on_update_fkey_table;
SELECT count(*) FROM referece_table;
-- load some data for the next tests
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- make sure that DDLs along with TRUNCATE worker fine
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN x INT;
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE referece_table CASCADE;
SELECT count(*) FROM on_update_fkey_table;
SELECT count(*) FROM referece_table;
-- test within transaction blocks
BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE referece_table CASCADE;
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE on_update_fkey_table;
TRUNCATE referece_table CASCADE;
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
BEGIN;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM on_update_fkey_table;
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
DROP SCHEMA truncate_from_workers CASCADE;
SET search_path TO public;

View File

@ -124,10 +124,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
\c - - - :master_port \c - - - :master_port
SELECT master_remove_node('localhost', 5432); SELECT master_remove_node('localhost', 5432);
-- TRUNCATE
\c - - - :worker_1_port \c - - - :worker_1_port
TRUNCATE mx_table;
SELECT count(*) FROM mx_table;
-- mark_tables_colocated -- mark_tables_colocated
UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass;