mirror of https://github.com/citusdata/citus.git
Merge pull request #4005 from citusdata/fix/coordinator_repartition_join
Send commands to coordinator when it is added as a workerpull/3698/head
commit
fd760fa4b3
|
@ -118,8 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
|
|||
}
|
||||
else
|
||||
{
|
||||
List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
|
||||
int workerNodeCount = list_length(workerNodeList);
|
||||
int workerNodeCount = list_length(ActiveReadableNodeList());
|
||||
int taskCount = list_length(job->taskList);
|
||||
double tasksPerNode = taskCount / ((double) workerNodeCount);
|
||||
|
||||
|
|
|
@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job)
|
|||
* assigning and checking the status of tasks. The second (temporary) hash
|
||||
* helps us in fetching results data from worker nodes to the master node.
|
||||
*/
|
||||
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
List *workerNodeList = ActivePrimaryNodeList(ShareLock);
|
||||
uint32 taskTrackerCount = (uint32) list_length(workerNodeList);
|
||||
|
||||
/* connect as the current user for running queries */
|
||||
|
|
|
@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
|
|||
{
|
||||
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
|
||||
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
|
||||
SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand,
|
||||
SendCommandToWorkersInParallel(ALL_SHARD_NODES, createSchemasCommand,
|
||||
CitusExtensionOwnerName());
|
||||
return jobIds;
|
||||
}
|
||||
|
@ -191,8 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand)
|
|||
void
|
||||
DoRepartitionCleanup(List *jobIds)
|
||||
{
|
||||
SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES,
|
||||
GenerateDeleteJobsCommand(jobIds),
|
||||
SendCommandToWorkersOptionalInParallel(ALL_SHARD_NODES, GenerateDeleteJobsCommand(
|
||||
jobIds),
|
||||
CitusExtensionOwnerName());
|
||||
}
|
||||
|
||||
|
|
|
@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey,
|
|||
static uint32
|
||||
HashPartitionCount(void)
|
||||
{
|
||||
uint32 groupCount = ActiveReadableNonCoordinatorNodeCount();
|
||||
uint32 groupCount = list_length(ActiveReadableNodeList());
|
||||
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
|
||||
|
||||
uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode);
|
||||
|
@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList)
|
|||
* if subsequent jobs have a small number of tasks, we won't allocate the
|
||||
* tasks to the same worker repeatedly.
|
||||
*/
|
||||
List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
|
||||
List *workerNodeList = ActiveReadableNodeList();
|
||||
uint32 workerNodeCount = (uint32) list_length(workerNodeList);
|
||||
uint32 beginningNodeIndex = jobId % workerNodeCount;
|
||||
|
||||
|
|
|
@ -130,7 +130,15 @@ SendCommandToWorkersWithMetadata(const char *command)
|
|||
List *
|
||||
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
|
||||
{
|
||||
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
|
||||
List *workerNodeList = NIL;
|
||||
if (targetWorkerSet == ALL_SHARD_NODES)
|
||||
{
|
||||
workerNodeList = ActivePrimaryNodeList(lockMode);
|
||||
}
|
||||
else
|
||||
{
|
||||
workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
|
||||
}
|
||||
List *result = NIL;
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
|
|
|
@ -18,13 +18,13 @@
|
|||
|
||||
/*
|
||||
* TargetWorkerSet is used for determining the type of workers that a command
|
||||
* is targeted to. Currently it doesn't include coordinator even if it is added
|
||||
* as a worker.
|
||||
* is targeted to.
|
||||
*/
|
||||
typedef enum TargetWorkerSet
|
||||
{
|
||||
NON_COORDINATOR_METADATA_NODES,
|
||||
NON_COORDINATOR_NODES
|
||||
NON_COORDINATOR_NODES,
|
||||
ALL_SHARD_NODES
|
||||
} TargetWorkerSet;
|
||||
|
||||
|
||||
|
|
|
@ -198,13 +198,23 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato
|
|||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- Commented out since it currently does not clean up task files on the
|
||||
-- coordinator
|
||||
-- See this issue for details: https://github.com/citusdata/citus/issues/3996
|
||||
-- BEGIN;
|
||||
-- SET citus.enable_repartition_joins TO ON;
|
||||
-- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
||||
-- ROLLBACK;
|
||||
-- repartition queries should work fine
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
-- trigger local execution
|
||||
|
@ -446,6 +456,7 @@ BEGIN;
|
|||
-- copying task
|
||||
INSERT INTO dist_table SELECT a + 1 FROM dist_table;
|
||||
ROLLBACK;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
BEGIN;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
CREATE TABLE dist_table1(a int);
|
||||
|
@ -466,11 +477,12 @@ RESET citus.enable_cte_inlining;
|
|||
DELETE FROM test;
|
||||
DROP TABLE test;
|
||||
DROP TABLE dist_table;
|
||||
DROP TABLE ref;
|
||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
||||
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
|
||||
PL/pgSQL function citus_drop_trigger() line 19 at PERFORM
|
||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
DETAIL: drop cascades to table ref
|
||||
drop cascades to table ref_1503016
|
||||
drop cascades to table local
|
||||
NOTICE: drop cascades to table local
|
||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -34,6 +34,55 @@ CREATE TABLE local(c int, d int);
|
|||
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
|
||||
INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8);
|
||||
INSERT INTO local VALUES (1, 2), (3, 4), (7, 8);
|
||||
-- Check repartion joins are supported
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
-- connect to the follower and check that a simple select query works, the follower
|
||||
-- is still in the default cluster and will send queries to the primary nodes
|
||||
\c - - - :follower_master_port
|
||||
|
@ -102,15 +151,29 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
|||
7 | 8 | 7 | 8
|
||||
(2 rows)
|
||||
|
||||
-- Check repartion joins are support
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: floating-point exception
|
||||
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: floating-point exception
|
||||
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
RESET citus.enable_repartition_joins;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
-- Confirm that dummy placements work
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
count
|
||||
|
@ -136,6 +199,30 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
RESET citus.task_assignment_policy;
|
||||
-- now, connect to the follower but tell it to use secondary nodes. There are no
|
||||
-- secondary nodes so this should fail.
|
||||
|
@ -226,15 +313,16 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
|||
7 | 8 | 7 | 8
|
||||
(2 rows)
|
||||
|
||||
-- Check repartion joins are support
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: floating-point exception
|
||||
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: floating-point exception
|
||||
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
RESET citus.enable_repartition_joins;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
-- Confirm that dummy placements work
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
count
|
||||
|
|
|
@ -33,6 +33,21 @@ SELECT create_distributed_table('stock','s_w_id');
|
|||
(1 row)
|
||||
|
||||
INSERT INTO stock SELECT c, c, c FROM generate_series(1, 5) as c;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
-- connect to the follower and check that a simple select query works, the follower
|
||||
-- is still in the default cluster and will send queries to the primary nodes
|
||||
\c - - - :follower_master_port
|
||||
|
@ -100,6 +115,14 @@ order by s_i_id;
|
|||
5 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b);
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b;
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is read-only
|
||||
SELECT
|
||||
node_name, node_port
|
||||
FROM
|
||||
|
|
|
@ -101,15 +101,55 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
|||
7 | 8 | 7 | 8
|
||||
(2 rows)
|
||||
|
||||
-- Check repartion joins are support
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: floating-point exception
|
||||
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
|
||||
-- Check repartion joins are supported
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
ERROR: floating-point exception
|
||||
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
x | y | x | y
|
||||
---------------------------------------------------------------------
|
||||
2 | 7 | 1 | 2
|
||||
4 | 5 | 3 | 4
|
||||
5 | 6 | 4 | 5
|
||||
(3 rows)
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
-- INSERT SELECT router
|
||||
BEGIN;
|
||||
INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1;
|
||||
|
@ -226,6 +266,51 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
|||
(0 rows)
|
||||
|
||||
RESET citus.task_assignment_policy;
|
||||
-- single node task tracker tests:
|
||||
SET citus.task_executor_type to 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO ref(a, b) SELECT x, y FROM test;
|
||||
SELECT count(*) from ref;
|
||||
ERROR: cannot switch local execution status from local execution required to local execution disabled since it can cause visibility problems in the current transaction
|
||||
ROLLBACK;
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO ref(a, b) SELECT c, d FROM local;
|
||||
SELECT count(*) from ref;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO local(c, d) SELECT x, y FROM test;
|
||||
SELECT count(*) from local;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
8
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO local(c, d) SELECT a, b FROM ref;
|
||||
SELECT count(*) from local;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
RESET citus.task_executor_type;
|
||||
-- Cleanup
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA single_node CASCADE;
|
||||
|
|
|
@ -87,13 +87,14 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none');
|
|||
SELECT count(*) FROM dist_table;
|
||||
ROLLBACK;
|
||||
|
||||
-- Commented out since it currently does not clean up task files on the
|
||||
-- coordinator
|
||||
-- See this issue for details: https://github.com/citusdata/citus/issues/3996
|
||||
-- BEGIN;
|
||||
-- SET citus.enable_repartition_joins TO ON;
|
||||
-- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
||||
-- ROLLBACK;
|
||||
-- repartition queries should work fine
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
||||
|
||||
BEGIN;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
|
@ -190,6 +191,7 @@ BEGIN;
|
|||
-- copying task
|
||||
INSERT INTO dist_table SELECT a + 1 FROM dist_table;
|
||||
ROLLBACK;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
BEGIN;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
|
@ -204,6 +206,7 @@ RESET citus.enable_cte_inlining;
|
|||
DELETE FROM test;
|
||||
DROP TABLE test;
|
||||
DROP TABLE dist_table;
|
||||
DROP TABLE ref;
|
||||
|
||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||
|
||||
|
|
|
@ -18,6 +18,27 @@ INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
|
|||
INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8);
|
||||
INSERT INTO local VALUES (1, 2), (3, 4), (7, 8);
|
||||
|
||||
-- Check repartion joins are supported
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
|
||||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
|
||||
|
||||
|
||||
-- connect to the follower and check that a simple select query works, the follower
|
||||
-- is still in the default cluster and will send queries to the primary nodes
|
||||
\c - - - :follower_master_port
|
||||
|
@ -34,11 +55,23 @@ SELECT count(*) FROM local;
|
|||
SELECT * FROM local ORDER BY c;
|
||||
SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
||||
|
||||
-- Check repartion joins are support
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
|
||||
-- Confirm that dummy placements work
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
|
@ -47,6 +80,12 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
|||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
||||
RESET citus.task_assignment_policy;
|
||||
|
||||
|
||||
|
@ -82,11 +121,12 @@ SELECT count(*) FROM local;
|
|||
SELECT * FROM local ORDER BY c;
|
||||
SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
||||
|
||||
-- Check repartion joins are support
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
RESET citus.enable_repartition_joins;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
|
||||
-- Confirm that dummy placements work
|
||||
SELECT count(*) FROM test WHERE false;
|
||||
|
|
|
@ -21,6 +21,14 @@ SELECT create_distributed_table('stock','s_w_id');
|
|||
|
||||
INSERT INTO stock SELECT c, c, c FROM generate_series(1, 5) as c;
|
||||
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b);
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
|
||||
SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b;
|
||||
RESET citus.enable_repartition_joins;
|
||||
|
||||
|
||||
-- connect to the follower and check that a simple select query works, the follower
|
||||
-- is still in the default cluster and will send queries to the primary nodes
|
||||
|
@ -66,6 +74,13 @@ group by s_i_id
|
|||
having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from stock)
|
||||
order by s_i_id;
|
||||
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b);
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
|
||||
SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b;
|
||||
|
||||
|
||||
SELECT
|
||||
node_name, node_port
|
||||
|
|
|
@ -33,11 +33,24 @@ SELECT count(*) FROM local;
|
|||
SELECT * FROM local ORDER BY c;
|
||||
SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
||||
|
||||
-- Check repartion joins are support
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
-- Check repartion joins are supported
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'round-robin';
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'greedy';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
SET citus.task_assignment_policy TO 'first-replica';
|
||||
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
RESET citus.enable_single_hash_repartition_joins;
|
||||
|
||||
-- INSERT SELECT router
|
||||
BEGIN;
|
||||
|
@ -102,6 +115,35 @@ SELECT count(*) FROM test WHERE false;
|
|||
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
||||
RESET citus.task_assignment_policy;
|
||||
|
||||
-- single node task tracker tests:
|
||||
SET citus.task_executor_type to 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO ref(a, b) SELECT x, y FROM test;
|
||||
SELECT count(*) from ref;
|
||||
ROLLBACK;
|
||||
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO ref(a, b) SELECT c, d FROM local;
|
||||
SELECT count(*) from ref;
|
||||
ROLLBACK;
|
||||
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO local(c, d) SELECT x, y FROM test;
|
||||
SELECT count(*) from local;
|
||||
ROLLBACK;
|
||||
|
||||
-- INSERT SELECT from distributed table to local table
|
||||
BEGIN;
|
||||
INSERT INTO local(c, d) SELECT a, b FROM ref;
|
||||
SELECT count(*) from local;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.task_executor_type;
|
||||
|
||||
-- Cleanup
|
||||
SET client_min_messages TO WARNING;
|
||||
|
|
Loading…
Reference in New Issue