mirror of https://github.com/citusdata/citus.git
Merge remote-tracking branch 'origin/master' into normalized-test-output
commit
4a20ba3bfc
|
@ -35,6 +35,7 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
|
#include "distributed/insert_select_planner.h"
|
||||||
#include "distributed/multi_physical_planner.h" /* only to use some utility functions */
|
#include "distributed/multi_physical_planner.h" /* only to use some utility functions */
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
|
@ -52,6 +53,7 @@
|
||||||
#else
|
#else
|
||||||
#include "optimizer/clauses.h"
|
#include "optimizer/clauses.h"
|
||||||
#endif
|
#endif
|
||||||
|
#include "tcop/pquery.h"
|
||||||
|
|
||||||
bool EnableFastPathRouterPlanner = true;
|
bool EnableFastPathRouterPlanner = true;
|
||||||
|
|
||||||
|
@ -95,7 +97,6 @@ FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
|
||||||
parse->jointree->quals =
|
parse->jointree->quals =
|
||||||
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
||||||
|
|
||||||
|
|
||||||
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
|
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -126,7 +127,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
/* there is only a single relation rte */
|
/* there is only a single relation rte */
|
||||||
seqScanNode->scanrelid = 1;
|
seqScanNode->scanrelid = 1;
|
||||||
|
|
||||||
plan->targetlist = copyObject(parse->targetList);
|
plan->targetlist =
|
||||||
|
copyObject(FetchStatementTargetList((Node *) parse));
|
||||||
|
|
||||||
plan->qual = NULL;
|
plan->qual = NULL;
|
||||||
plan->lefttree = NULL;
|
plan->lefttree = NULL;
|
||||||
plan->righttree = NULL;
|
plan->righttree = NULL;
|
||||||
|
@ -139,6 +142,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
|
|
||||||
result->rtable = copyObject(parse->rtable);
|
result->rtable = copyObject(parse->rtable);
|
||||||
result->planTree = (Plan *) plan;
|
result->planTree = (Plan *) plan;
|
||||||
|
result->hasReturning = (parse->returningList != NIL);
|
||||||
|
|
||||||
Oid relationId = ExtractFirstDistributedTableId(parse);
|
Oid relationId = ExtractFirstDistributedTableId(parse);
|
||||||
result->relationOids = list_make1_oid(relationId);
|
result->relationOids = list_make1_oid(relationId);
|
||||||
|
@ -158,7 +162,8 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
* and it should be ANDed with any other filters. Also, the distribution
|
* and it should be ANDed with any other filters. Also, the distribution
|
||||||
* key should only exists once in the WHERE clause. So basically,
|
* key should only exists once in the WHERE clause. So basically,
|
||||||
* SELECT ... FROM dist_table WHERE dist_key = X
|
* SELECT ... FROM dist_table WHERE dist_key = X
|
||||||
* - No returning for UPDATE/DELETE queries
|
* - All INSERT statements (including multi-row INSERTs) as long as the commands
|
||||||
|
* don't have any sublinks/CTEs etc
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
FastPathRouterQuery(Query *query)
|
FastPathRouterQuery(Query *query)
|
||||||
|
@ -171,22 +176,27 @@ FastPathRouterQuery(Query *query)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(query->commandType == CMD_SELECT || query->commandType == CMD_UPDATE ||
|
/*
|
||||||
query->commandType == CMD_DELETE))
|
* We want to deal with only very simple queries. Some of the
|
||||||
|
* checks might be too restrictive, still we prefer this way.
|
||||||
|
*/
|
||||||
|
if (query->cteList != NIL || query->hasSubLinks ||
|
||||||
|
query->setOperations != NULL || query->hasTargetSRFs ||
|
||||||
|
query->hasModifyingCTE)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
if (CheckInsertSelectQuery(query))
|
||||||
* We want to deal with only very simple select queries. Some of the
|
|
||||||
* checks might be too restrictive, still we prefer this way.
|
|
||||||
*/
|
|
||||||
if (query->cteList != NIL || query->returningList != NIL ||
|
|
||||||
query->hasSubLinks || query->setOperations != NULL ||
|
|
||||||
query->hasTargetSRFs || query->hasModifyingCTE)
|
|
||||||
{
|
{
|
||||||
|
/* we don't support INSERT..SELECT in the fast-path */
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
else if (query->commandType == CMD_INSERT)
|
||||||
|
{
|
||||||
|
/* we don't need to do any further checks, all INSERTs are fast-path */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/* make sure that the only range table in FROM clause */
|
/* make sure that the only range table in FROM clause */
|
||||||
if (list_length(query->rtable) != 1)
|
if (list_length(query->rtable) != 1)
|
||||||
|
|
|
@ -70,7 +70,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
||||||
selectPartitionColumnTableId);
|
selectPartitionColumnTableId);
|
||||||
static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse);
|
static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse);
|
||||||
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
|
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
|
||||||
static bool CheckInsertSelectQuery(Query *query);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -129,7 +128,7 @@ InsertSelectIntoLocalTable(Query *query)
|
||||||
* This function is inspired from getInsertSelectQuery() on
|
* This function is inspired from getInsertSelectQuery() on
|
||||||
* rewrite/rewriteManip.c.
|
* rewrite/rewriteManip.c.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
CheckInsertSelectQuery(Query *query)
|
CheckInsertSelectQuery(Query *query)
|
||||||
{
|
{
|
||||||
CmdType commandType = query->commandType;
|
CmdType commandType = query->commandType;
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
|
|
||||||
extern bool InsertSelectIntoDistributedTable(Query *query);
|
extern bool InsertSelectIntoDistributedTable(Query *query);
|
||||||
|
extern bool CheckInsertSelectQuery(Query *query);
|
||||||
extern bool InsertSelectIntoLocalTable(Query *query);
|
extern bool InsertSelectIntoLocalTable(Query *query);
|
||||||
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
||||||
RangeTblEntry *insertRte,
|
RangeTblEntry *insertRte,
|
||||||
|
|
|
@ -3,18 +3,15 @@
|
||||||
-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE
|
-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE
|
||||||
-- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR
|
-- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR
|
||||||
-- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND
|
-- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND
|
||||||
-- WINDOWS SUPPORT, FAILURES IN TASK-TRACKER EXECUTOR
|
-- WINDOWS SUPPORT SO WE DISABLE THIS TEST ON WINDOWS
|
||||||
-- SO WE DISABLE THIS TEST ON WINDOWS
|
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%';
|
SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%';
|
||||||
pg_ls_dir
|
pg_ls_dir
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
SELECT run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'$$);
|
SELECT * FROM run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') r WHERE citus_version() NOT ILIKE '%windows%'$$) WHERE result <> '';
|
||||||
run_command_on_workers
|
nodename | nodeport | success | result
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(localhost,57637,t,"")
|
(0 rows)
|
||||||
(localhost,57638,t,"")
|
|
||||||
(2 rows)
|
|
||||||
|
|
||||||
|
|
|
@ -105,15 +105,44 @@ ERROR: modifying the partition value of rows is not allowed
|
||||||
UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1;
|
UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1;
|
||||||
DEBUG: modifying the partition value of rows is not allowed
|
DEBUG: modifying the partition value of rows is not allowed
|
||||||
ERROR: modifying the partition value of rows is not allowed
|
ERROR: modifying the partition value of rows is not allowed
|
||||||
-- returning is not supported via fast-path
|
-- returning is supported via fast-path
|
||||||
|
INSERT INTO modify_fast_path (key, value_1) VALUES (1,1);
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DETAIL: distribution column value: 1
|
||||||
DELETE FROM modify_fast_path WHERE key = 1 RETURNING *;
|
DELETE FROM modify_fast_path WHERE key = 1 RETURNING *;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
DETAIL: distribution column value: 1
|
DETAIL: distribution column value: 1
|
||||||
key | value_1 | value_2
|
key | value_1 | value_2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
1 | 1 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, key;
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DETAIL: distribution column value: 2
|
||||||
|
value_1 | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DETAIL: distribution column value: 2
|
||||||
|
?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
15 | 16
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- still, non-immutable functions are not supported
|
||||||
|
INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key;
|
||||||
|
DEBUG: non-IMMUTABLE functions are not allowed in the RETURNING clause
|
||||||
|
ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause
|
||||||
-- modifying ctes are not supported via fast-path
|
-- modifying ctes are not supported via fast-path
|
||||||
WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2;
|
WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2;
|
||||||
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
|
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
|
||||||
|
|
|
@ -644,7 +644,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
CALL only_local_execution_with_params(1);
|
CALL only_local_execution_with_params(1);
|
||||||
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES ($1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '29'::text
|
||||||
CONTEXT: SQL statement "INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29'"
|
CONTEXT: SQL statement "INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29'"
|
||||||
PL/pgSQL function only_local_execution_with_params(integer) line 4 at SQL statement
|
PL/pgSQL function only_local_execution_with_params(integer) line 4 at SQL statement
|
||||||
LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
@ -905,6 +905,163 @@ LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_e
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15;
|
||||||
|
PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15;
|
||||||
|
BEGIN;
|
||||||
|
-- 6 local execution without params
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- 6 local executions with params
|
||||||
|
EXECUTE local_insert_prepare_param(1);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_param(5);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5 | 2928 | 22 | 6 | 292830 | 330
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_param(6);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6 | 11 | 21 | 7 | 1130 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_param(1);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2928 | 21 | 2 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_param(5);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5 | 2928 | 22 | 6 | 292830 | 330
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE local_insert_prepare_param(6);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6, '11'::text, '21'::bigint) ON CONFLICT(key) DO UPDATE SET value = '2928'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age, (citus_table_alias.key OPERATOR(pg_catalog.+) 1), (citus_table_alias.value OPERATOR(pg_catalog.||) '30'::text), (citus_table_alias.age OPERATOR(pg_catalog.*) 15)
|
||||||
|
key | value | age | ?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6 | 2928 | 21 | 7 | 292830 | 315
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- followed by a non-local execution
|
||||||
|
EXECUTE remote_prepare_param(2);
|
||||||
|
LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2)
|
||||||
|
LOG: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE (key OPERATOR(pg_catalog.<>) 2)
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
PREPARE local_multi_row_insert_prepare_no_param AS
|
||||||
|
INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;
|
||||||
|
PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS
|
||||||
|
INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;;
|
||||||
|
PREPARE local_multi_row_insert_prepare_params(int,int) AS
|
||||||
|
INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;;
|
||||||
|
INSERT INTO reference_table VALUES (11);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (11)
|
||||||
|
BEGIN;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(1,6);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(1,5);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'55'::text,'21'::bigint), (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(6,5);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,1);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,6);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470003 AS citus_table_alias (key, value, age) VALUES (6,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,1);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint), (1,'15'::text,'33'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
-- one task is remote
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,11);
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (5,'55'::text,'21'::bigint) ON CONFLICT(key) WHERE ((key OPERATOR(pg_catalog.>) 3) AND (key OPERATOR(pg_catalog.<) 4)) DO UPDATE SET value = ('88'::text OPERATOR(pg_catalog.||) excluded.value)
|
||||||
|
ROLLBACK;
|
||||||
-- failures of local execution should rollback both the
|
-- failures of local execution should rollback both the
|
||||||
-- local execution and remote executions
|
-- local execution and remote executions
|
||||||
-- fail on a local execution
|
-- fail on a local execution
|
||||||
|
@ -1110,6 +1267,102 @@ LOG: executing the command locally: SELECT key, ser, ts, collection_id, value F
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
TRUNCATE collections_list;
|
||||||
|
-- make sure that even if local execution is used, the sequence values
|
||||||
|
-- are generated locally
|
||||||
|
ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE;
|
||||||
|
PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser;
|
||||||
|
SELECT setval('collections_list_key_seq', 4);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('5'::bigint, '3940649673949187'::bigint, 0) RETURNING key, ser
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5 | 3940649673949187
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setval('collections_list_key_seq', 5);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('6'::bigint, '3940649673949188'::bigint, 0) RETURNING key, ser
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6 | 3940649673949188
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setval('collections_list_key_seq', 499);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
499
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('500'::bigint, '3940649673949189'::bigint, 0) RETURNING key, ser
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
500 | 3940649673949189
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setval('collections_list_key_seq', 700);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
700
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('701'::bigint, '3940649673949190'::bigint, 0) RETURNING key, ser
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
701 | 3940649673949190
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setval('collections_list_key_seq', 708);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
708
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470011 (key, ser, collection_id) VALUES ('709'::bigint, '3940649673949191'::bigint, 0) RETURNING key, ser
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
709 | 3940649673949191
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setval('collections_list_key_seq', 709);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
709
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
LOG: executing the command locally: INSERT INTO local_shard_execution.collections_list_1470009 (key, ser, collection_id) VALUES ('710'::bigint, '3940649673949192'::bigint, 0) RETURNING key, ser
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
710 | 3940649673949192
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- and, one remote test
|
||||||
|
SELECT setval('collections_list_key_seq', 10);
|
||||||
|
setval
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
10
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
key | ser
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
11 | 3940649673949193
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- the final queries for the following CTEs are going to happen on the intermediate results only
|
-- the final queries for the following CTEs are going to happen on the intermediate results only
|
||||||
-- one of them will be executed remotely, and the other is locally
|
-- one of them will be executed remotely, and the other is locally
|
||||||
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
|
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
|
||||||
|
|
|
@ -809,6 +809,13 @@ SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SELECT run_command_on_workers($$SELECT task_tracker_cleanup_job(42);$$);
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(localhost,57637,t,"")
|
||||||
|
(localhost,57638,t,"")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
DROP SCHEMA full_access_user_schema CASCADE;
|
DROP SCHEMA full_access_user_schema CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
NOTICE: drop cascades to 4 other objects
|
||||||
DETAIL: drop cascades to table full_access_user_schema.t1
|
DETAIL: drop cascades to table full_access_user_schema.t1
|
||||||
|
|
|
@ -153,6 +153,7 @@ INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_
|
||||||
-- this errors out since there is no unique constraint on partition key
|
-- this errors out since there is no unique constraint on partition key
|
||||||
INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING;
|
INSERT INTO upsert_test_2 (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING;
|
||||||
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
|
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
-- create another table
|
-- create another table
|
||||||
CREATE TABLE upsert_test_3
|
CREATE TABLE upsert_test_3
|
||||||
(
|
(
|
||||||
|
@ -171,6 +172,7 @@ SELECT create_distributed_table('upsert_test_3', 'part_key', 'hash');
|
||||||
-- since there are no unique indexes, error-out
|
-- since there are no unique indexes, error-out
|
||||||
INSERT INTO upsert_test_3 VALUES (1, 0) ON CONFLICT(part_key) DO UPDATE SET count = upsert_test_3.count + 1;
|
INSERT INTO upsert_test_3 VALUES (1, 0) ON CONFLICT(part_key) DO UPDATE SET count = upsert_test_3.count + 1;
|
||||||
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
|
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
-- create another table
|
-- create another table
|
||||||
CREATE TABLE upsert_test_4
|
CREATE TABLE upsert_test_4
|
||||||
(
|
(
|
||||||
|
|
|
@ -101,3 +101,10 @@ SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
|
||||||
ERROR: could not stat file "base/pgsql_job_cache/job_401010/task_801107": No such file or directory
|
ERROR: could not stat file "base/pgsql_job_cache/job_401010/task_801107": No such file or directory
|
||||||
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
|
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
|
||||||
ERROR: could not stat file "base/pgsql_job_cache/job_401010": No such file or directory
|
ERROR: could not stat file "base/pgsql_job_cache/job_401010": No such file or directory
|
||||||
|
-- Also clean up worker_cleanup_job_schema_cache job
|
||||||
|
SELECT task_tracker_cleanup_job(2);
|
||||||
|
task_tracker_cleanup_job
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
-- Clear job directory used by previous tests
|
||||||
|
\set JobId 201010
|
||||||
|
SELECT task_tracker_cleanup_job(:JobId);
|
||||||
|
task_tracker_cleanup_job
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
|
@ -30,3 +30,6 @@ test: failure_savepoints
|
||||||
test: failure_multi_row_insert
|
test: failure_multi_row_insert
|
||||||
test: failure_mx_metadata_sync
|
test: failure_mx_metadata_sync
|
||||||
test: failure_connection_establishment
|
test: failure_connection_establishment
|
||||||
|
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -3,3 +3,6 @@ test: multi_follower_select_statements
|
||||||
test: multi_follower_dml
|
test: multi_follower_dml
|
||||||
test: multi_follower_configure_followers
|
test: multi_follower_configure_followers
|
||||||
test: multi_follower_task_tracker
|
test: multi_follower_task_tracker
|
||||||
|
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -42,3 +42,6 @@ test: multi_mx_transaction_recovery
|
||||||
test: multi_mx_modifying_xacts
|
test: multi_mx_modifying_xacts
|
||||||
test: multi_mx_explain
|
test: multi_mx_explain
|
||||||
test: multi_mx_reference_table
|
test: multi_mx_reference_table
|
||||||
|
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -313,3 +313,8 @@ test: distributed_procedure
|
||||||
# deparsing logic tests
|
# deparsing logic tests
|
||||||
# ---------
|
# ---------
|
||||||
test: multi_deparse_function multi_deparse_procedure
|
test: multi_deparse_function multi_deparse_procedure
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
# ---------
|
||||||
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -110,3 +110,7 @@ test: multi_drop_extension
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_schema_support
|
test: multi_schema_support
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
# ----------
|
||||||
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -4,9 +4,8 @@
|
||||||
-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE
|
-- THE REGRESSION TEST SUITE TO MAKE SURE THAT WE
|
||||||
-- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR
|
-- CLEAR ALL INTERMEDIATE RESULTS ON BOTH THE COORDINATOR
|
||||||
-- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND
|
-- AND ON THE WORKERS. HOWEVER, WE HAVE SOME ISSUES AROUND
|
||||||
-- WINDOWS SUPPORT, FAILURES IN TASK-TRACKER EXECUTOR
|
-- WINDOWS SUPPORT SO WE DISABLE THIS TEST ON WINDOWS
|
||||||
-- SO WE DISABLE THIS TEST ON WINDOWS
|
|
||||||
------
|
------
|
||||||
|
|
||||||
SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%';
|
SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%';
|
||||||
SELECT run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') WHERE citus_version() NOT ILIKE '%windows%'$$);
|
SELECT * FROM run_command_on_workers($$SELECT pg_ls_dir('base/pgsql_job_cache') r WHERE citus_version() NOT ILIKE '%windows%'$$) WHERE result <> '';
|
||||||
|
|
|
@ -53,8 +53,14 @@ UPDATE modify_fast_path SET key = 1::float WHERE key = 1;
|
||||||
UPDATE modify_fast_path SET key = 2 WHERE key = 1;
|
UPDATE modify_fast_path SET key = 2 WHERE key = 1;
|
||||||
UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1;
|
UPDATE modify_fast_path SET key = 2::numeric WHERE key = 1;
|
||||||
|
|
||||||
-- returning is not supported via fast-path
|
-- returning is supported via fast-path
|
||||||
|
INSERT INTO modify_fast_path (key, value_1) VALUES (1,1);
|
||||||
DELETE FROM modify_fast_path WHERE key = 1 RETURNING *;
|
DELETE FROM modify_fast_path WHERE key = 1 RETURNING *;
|
||||||
|
INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, key;
|
||||||
|
DELETE FROM modify_fast_path WHERE key = 2 RETURNING value_1 * 15, value_1::numeric * 16;
|
||||||
|
|
||||||
|
-- still, non-immutable functions are not supported
|
||||||
|
INSERT INTO modify_fast_path (key, value_1) VALUES (2,1) RETURNING value_1, random() * key;
|
||||||
|
|
||||||
-- modifying ctes are not supported via fast-path
|
-- modifying ctes are not supported via fast-path
|
||||||
WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2;
|
WITH t1 AS (DELETE FROM modify_fast_path WHERE key = 1), t2 AS (SELECT * FROM modify_fast_path) SELECT * FROM t2;
|
||||||
|
|
|
@ -485,6 +485,65 @@ BEGIN;
|
||||||
EXECUTE remote_prepare_param(1);
|
EXECUTE remote_prepare_param(1);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
PREPARE local_insert_prepare_no_param AS INSERT INTO distributed_table VALUES (1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15;
|
||||||
|
PREPARE local_insert_prepare_param (int) AS INSERT INTO distributed_table VALUES ($1+0*random(), '11',21::int) ON CONFLICT(key) DO UPDATE SET value = '29' || '28' RETURNING *, key + 1, value || '30', age * 15;
|
||||||
|
BEGIN;
|
||||||
|
-- 6 local execution without params
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
EXECUTE local_insert_prepare_no_param;
|
||||||
|
|
||||||
|
-- 6 local executions with params
|
||||||
|
EXECUTE local_insert_prepare_param(1);
|
||||||
|
EXECUTE local_insert_prepare_param(5);
|
||||||
|
EXECUTE local_insert_prepare_param(6);
|
||||||
|
EXECUTE local_insert_prepare_param(1);
|
||||||
|
EXECUTE local_insert_prepare_param(5);
|
||||||
|
EXECUTE local_insert_prepare_param(6);
|
||||||
|
|
||||||
|
-- followed by a non-local execution
|
||||||
|
EXECUTE remote_prepare_param(2);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
PREPARE local_multi_row_insert_prepare_no_param AS
|
||||||
|
INSERT INTO distributed_table VALUES (1,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;
|
||||||
|
|
||||||
|
PREPARE local_multi_row_insert_prepare_no_param_multi_shard AS
|
||||||
|
INSERT INTO distributed_table VALUES (6,'55', 21), (5,'15',33) ON CONFLICT (key) WHERE key > 3 AND key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;;
|
||||||
|
|
||||||
|
PREPARE local_multi_row_insert_prepare_params(int,int) AS
|
||||||
|
INSERT INTO distributed_table VALUES ($1,'55', 21), ($2,'15',33) ON CONFLICT (key) WHERE key > 3 and key < 4 DO UPDATE SET value = '88' || EXCLUDED.value;;
|
||||||
|
INSERT INTO reference_table VALUES (11);
|
||||||
|
BEGIN;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param;
|
||||||
|
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
EXECUTE local_multi_row_insert_prepare_no_param_multi_shard;
|
||||||
|
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(1,6);
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(1,5);
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(6,5);
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,1);
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,6);
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,1);
|
||||||
|
|
||||||
|
-- one task is remote
|
||||||
|
EXECUTE local_multi_row_insert_prepare_params(5,11);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- failures of local execution should rollback both the
|
-- failures of local execution should rollback both the
|
||||||
-- local execution and remote executions
|
-- local execution and remote executions
|
||||||
|
@ -619,6 +678,32 @@ BEGIN;
|
||||||
SELECT * FROM collections_list ORDER BY 1,2,3,4;
|
SELECT * FROM collections_list ORDER BY 1,2,3,4;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
TRUNCATE collections_list;
|
||||||
|
|
||||||
|
-- make sure that even if local execution is used, the sequence values
|
||||||
|
-- are generated locally
|
||||||
|
ALTER SEQUENCE collections_list_key_seq NO MINVALUE NO MAXVALUE;
|
||||||
|
|
||||||
|
PREPARE serial_prepared_local AS INSERT INTO collections_list (collection_id) VALUES (0) RETURNING key, ser;
|
||||||
|
|
||||||
|
SELECT setval('collections_list_key_seq', 4);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
SELECT setval('collections_list_key_seq', 5);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
SELECT setval('collections_list_key_seq', 499);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
SELECT setval('collections_list_key_seq', 700);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
SELECT setval('collections_list_key_seq', 708);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
SELECT setval('collections_list_key_seq', 709);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
|
||||||
|
-- and, one remote test
|
||||||
|
SELECT setval('collections_list_key_seq', 10);
|
||||||
|
EXECUTE serial_prepared_local;
|
||||||
|
|
||||||
-- the final queries for the following CTEs are going to happen on the intermediate results only
|
-- the final queries for the following CTEs are going to happen on the intermediate results only
|
||||||
-- one of them will be executed remotely, and the other is locally
|
-- one of them will be executed remotely, and the other is locally
|
||||||
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
|
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
|
||||||
|
|
|
@ -488,6 +488,7 @@ RESET ROLE;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SELECT run_command_on_workers($$SELECT task_tracker_cleanup_job(42);$$);
|
||||||
|
|
||||||
DROP SCHEMA full_access_user_schema CASCADE;
|
DROP SCHEMA full_access_user_schema CASCADE;
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
|
|
|
@ -46,3 +46,6 @@ SELECT task_tracker_task_status(:JobId, :RunningTaskId);
|
||||||
|
|
||||||
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
|
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
|
||||||
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
|
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
|
||||||
|
|
||||||
|
-- Also clean up worker_cleanup_job_schema_cache job
|
||||||
|
SELECT task_tracker_cleanup_job(2);
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
-- Clear job directory used by previous tests
|
||||||
|
|
||||||
|
\set JobId 201010
|
||||||
|
|
||||||
|
SELECT task_tracker_cleanup_job(:JobId);
|
|
@ -19,6 +19,7 @@ test: worker_hash_partition worker_hash_partition_complex
|
||||||
test: worker_merge_range_files worker_merge_hash_files
|
test: worker_merge_range_files worker_merge_hash_files
|
||||||
test: worker_binary_data_partition worker_null_data_partition
|
test: worker_binary_data_partition worker_null_data_partition
|
||||||
test: worker_check_invalid_arguments
|
test: worker_check_invalid_arguments
|
||||||
|
test: worker_remove_files
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# All task tracker tests use the following tables
|
# All task tracker tests use the following tables
|
||||||
|
@ -26,3 +27,8 @@ test: worker_check_invalid_arguments
|
||||||
test: task_tracker_create_table
|
test: task_tracker_create_table
|
||||||
test: task_tracker_assign_task task_tracker_partition_task
|
test: task_tracker_assign_task task_tracker_partition_task
|
||||||
test: task_tracker_cleanup_job
|
test: task_tracker_cleanup_job
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
# ---------
|
||||||
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
Loading…
Reference in New Issue