adds citus.enable_repartition_joins GUC

The new GUC allows Citus to switch between task executors
when necessary
pull/1861/head
mehmet furkan şahin 2017-12-06 21:11:04 +03:00
parent 7544e91c87
commit 3c941aedf1
9 changed files with 69 additions and 24 deletions

View File

@ -29,6 +29,7 @@
int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
bool EnableRepartitionJoins = false;
/* /*
@ -98,13 +99,25 @@ JobExecutorType(DistributedPlan *distributedPlan)
"\"task-tracker\"."))); "\"task-tracker\".")));
} }
/* if we have repartition jobs with real time executor, error out */ /* if we have repartition jobs with real time executor and repartition
* joins are not enabled, error out. Otherwise, switch to task-tracker
*/
dependedJobCount = list_length(job->dependedJobList); dependedJobCount = list_length(job->dependedJobList);
if (dependedJobCount > 0) if (dependedJobCount > 0)
{ {
ereport(ERROR, (errmsg("cannot use real time executor with repartition jobs"), if (!EnableRepartitionJoins)
errhint("Set citus.task_executor_type to " {
"\"task-tracker\"."))); ereport(ERROR, (errmsg(
"the query contains a join that requires repartitioning"),
errhint("Set citus.enable_repartition_joins to on "
"to enable repartitioning")));
}
ereport(DEBUG1, (errmsg(
"cannot use real time executor with repartition jobs"),
errhint("Since you enabled citus.enable_repartition_joins "
"Citus chose to use task-tracker.")));
return MULTI_EXECUTOR_TASK_TRACKER;
} }
} }
else else

View File

@ -746,6 +746,16 @@ RegisterCitusConfigVariables(void)
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_repartition_joins",
gettext_noop("Allows Citus to use task-tracker executor when necessary."),
NULL,
&EnableRepartitionJoins,
false,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomEnumVariable( DefineCustomEnumVariable(
"citus.shard_placement_policy", "citus.shard_placement_policy",
gettext_noop("Sets the policy to use when choosing nodes for shard placement."), gettext_noop("Sets the policy to use when choosing nodes for shard placement."),

View File

@ -190,6 +190,7 @@ typedef struct WorkerNodeState
extern int RemoteTaskCheckInterval; extern int RemoteTaskCheckInterval;
extern int MaxAssignTaskBatchSize; extern int MaxAssignTaskBatchSize;
extern int TaskExecutorType; extern int TaskExecutorType;
extern bool EnableRepartitionJoins;
extern bool BinaryMasterCopyFormat; extern bool BinaryMasterCopyFormat;
extern int MultiTaskQueryLogLevel; extern int MultiTaskQueryLogLevel;

View File

@ -453,8 +453,8 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SELECT articles_hash_mx.id,test.word_count SELECT articles_hash_mx.id,test.word_count
FROM articles_hash_mx, (SELECT id, word_count FROM articles_hash_mx) AS test FROM articles_hash_mx, (SELECT id, word_count FROM articles_hash_mx) AS test
WHERE test.id = articles_hash_mx.id and articles_hash_mx.author_id = 1 WHERE test.id = articles_hash_mx.id and articles_hash_mx.author_id = 1
@ -487,8 +487,8 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- subqueries are not supported in SELECT clause -- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash_mx a2 WHERE a.id = a2.id LIMIT 1) SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash_mx a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash_mx a; AS special_price FROM articles_hash_mx a;
@ -793,8 +793,8 @@ LIMIT 5;
SELECT * SELECT *
FROM articles_hash_mx a, articles_hash_mx b FROM articles_hash_mx a, articles_hash_mx b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- queries which hit more than 1 shards are not router plannable or executable -- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor -- handled by real-time executor
SELECT * SELECT *

View File

@ -569,8 +569,8 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SELECT articles_hash.id,test.word_count SELECT articles_hash.id,test.word_count
FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test
WHERE test.id = articles_hash.id and articles_hash.author_id = 1 WHERE test.id = articles_hash.id and articles_hash.author_id = 1
@ -603,8 +603,8 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- subqueries are not supported in SELECT clause -- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1) SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash a; AS special_price FROM articles_hash a;
@ -910,8 +910,23 @@ LIMIT 5;
SELECT * SELECT *
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- by setting enable_repartition_joins we can make this query run
SET citus.enable_repartition_joins TO ON;
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
id | author_id | title | word_count | id | author_id | title | word_count
----+-----------+--------------+------------+----+-----------+--------------+------------
41 | 1 | aznavour | 11814 | 41 | 1 | aznavour | 11814
11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347
31 | 1 | athwartships | 7271 | 31 | 1 | athwartships | 7271
1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572
21 | 1 | arcading | 5890 | 21 | 1 | arcading | 5890
(5 rows)
SET citus.enable_repartition_joins TO OFF;
-- queries which hit more than 1 shards are not router plannable or executable -- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor -- handled by real-time executor
SELECT * SELECT *
@ -1674,8 +1689,8 @@ DEBUG: pruning merge fetch taskId 19
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 22 DEBUG: pruning merge fetch taskId 22
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- join between a range partitioned table and reference table is router plannable -- join between a range partitioned table and reference table is router plannable
SELECT * FROM articles_range ar join authors_reference au on (ar.author_id = au.id) SELECT * FROM articles_range ar join authors_reference au on (ar.author_id = au.id)
WHERE ar.author_id = 1; WHERE ar.author_id = 1;

View File

@ -559,8 +559,8 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- system columns from shard tables can be queried and retrieved -- system columns from shard tables can be queried and retrieved
SELECT count(*) FROM ( SELECT count(*) FROM (
SELECT tableoid, ctid, cmin, cmax, xmin, xmax SELECT tableoid, ctid, cmin, cmax, xmin, xmax

View File

@ -40,8 +40,8 @@ FROM
lineitem_subquery lineitem_subquery
GROUP BY GROUP BY
l_suppkey) AS order_counts; l_suppkey) AS order_counts;
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- Check that we error out if join is not on partition columns. -- Check that we error out if join is not on partition columns.
SELECT SELECT
avg(unit_price) avg(unit_price)

View File

@ -173,8 +173,8 @@ SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_linei
-- repartition query on view join -- repartition query on view join
-- it passes planning, fails at execution stage -- it passes planning, fails at execution stage
SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
ERROR: cannot use real time executor with repartition jobs ERROR: the query contains a join that requires repartitioning
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.task_executor_type to "task-tracker"; SET citus.task_executor_type to "task-tracker";
SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
count count

View File

@ -445,6 +445,12 @@ SELECT *
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
-- by setting enable_repartition_joins we can make this query run
SET citus.enable_repartition_joins TO ON;
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
SET citus.enable_repartition_joins TO OFF;
-- queries which hit more than 1 shards are not router plannable or executable -- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor -- handled by real-time executor
SELECT * SELECT *