Merge pull request #2339 from citusdata/fix_modifying_cte

Make sure that modifying CTEs always use the correct execution mode
pull/2333/head
Önder Kalacı 2018-08-23 15:44:38 +03:00 committed by GitHub
commit 6cea01620a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 469 additions and 123 deletions

View File

@ -88,19 +88,10 @@ static CustomExecMethods TaskTrackerCustomExecMethods = {
.ExplainCustomScan = CitusExplainScan .ExplainCustomScan = CitusExplainScan
}; };
static CustomExecMethods RouterSequentialModifyCustomExecMethods = { static CustomExecMethods RouterModifyCustomExecMethods = {
.CustomName = "RouterSequentialModifyScan", .CustomName = "RouterModifyScan",
.BeginCustomScan = CitusModifyBeginScan, .BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterSequentialModifyExecScan, .ExecCustomScan = RouterModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterMultiModifyCustomExecMethods = {
.CustomName = "RouterMultiModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterMultiModifyExecScan,
.EndCustomScan = CitusEndScan, .EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan, .ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan .ExplainCustomScan = CitusExplainScan
@ -187,6 +178,8 @@ RouterCreateScan(CustomScan *scan)
List *taskList = NIL; List *taskList = NIL;
bool isModificationQuery = false; bool isModificationQuery = false;
List *relationRowLockList = NIL;
scanState->executorType = MULTI_EXECUTOR_ROUTER; scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan); scanState->distributedPlan = GetDistributedPlan(scan);
@ -194,47 +187,22 @@ RouterCreateScan(CustomScan *scan)
distributedPlan = scanState->distributedPlan; distributedPlan = scanState->distributedPlan;
workerJob = distributedPlan->workerJob; workerJob = distributedPlan->workerJob;
taskList = workerJob->taskList; taskList = workerJob->taskList;
isModificationQuery = IsModifyDistributedPlan(distributedPlan); isModificationQuery = IsModifyDistributedPlan(distributedPlan);
/* check whether query has at most one shard */ if (list_length(taskList) == 1)
if (list_length(taskList) <= 1)
{ {
List *relationRowLockList = NIL; Task *task = (Task *) linitial(taskList);
if (list_length(taskList) == 1) relationRowLockList = task->relationRowLockList;
{ }
Task *task = (Task *) linitial(taskList);
relationRowLockList = task->relationRowLockList;
}
/* if query is SELECT ... FOR UPDATE query, use modify logic */ /* if query is SELECT ... FOR UPDATE query, use modify logic */
if (isModificationQuery || relationRowLockList != NIL) if (isModificationQuery || relationRowLockList != NIL)
{ {
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; scanState->customScanState.methods = &RouterModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterSelectCustomExecMethods;
}
} }
else else
{ {
Assert(isModificationQuery); scanState->customScanState.methods = &RouterSelectCustomExecMethods;
if (IsMultiRowInsert(workerJob->jobQuery) ||
MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
/*
* Multi shard modifications while multi_shard_modify_mode equals
* to 'sequential' or Multi-row INSERT are executed sequentially
* instead of using parallel connections.
*/
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods;
}
} }
return (Node *) scanState; return (Node *) scanState;

View File

@ -96,6 +96,8 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList); static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task); static bool RequiresConsistentSnapshot(Task *task);
static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
@ -534,12 +536,15 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
/* /*
* RouterSequentialModifyExecScan executes 0 or more modifications on a * RouterModifyExecScan executes a list of tasks on remote nodes, retrieves
* distributed table sequentially and returns results if there are any. * the results and, if RETURNING is used or SELECT FOR UPDATE executed,
* Note that we also use this path for SELECT ... FOR UPDATE queries. * returns the results with a TupleTableSlot.
*
* The function can handle both single task query executions,
* sequential or parallel multi-task query executions.
*/ */
TupleTableSlot * TupleTableSlot *
RouterSequentialModifyExecScan(CustomScanState *node) RouterModifyExecScan(CustomScanState *node)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL; TupleTableSlot *resultSlot = NULL;
@ -547,63 +552,26 @@ RouterSequentialModifyExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
ListCell *taskCell = NULL; bool parallelExecution = true;
bool multipleTasks = list_length(taskList) > 1;
EState *executorState = scanState->customScanState.ss.ps.state;
bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool alwaysThrowErrorOnFailure = false;
CmdType operation = scanState->distributedPlan->operation;
/*
* We could naturally handle function-based transactions (i.e. those using
* PL/pgSQL or similar) by checking the type of queryDesc->dest, but some
* customers already use functions that touch multiple shards from within
* a function, so we'll ignore functions for now.
*/
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC)
{
BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always set alwaysThrowErrorOnFailure since we
* absolutely want to avoid marking any placements invalid.
*
* We also cannot handle failures when there is RETURNING and there are more
* than one task to execute.
*/
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
alwaysThrowErrorOnFailure = true;
}
else if (multipleTasks && hasReturning)
{
alwaysThrowErrorOnFailure = true;
}
}
ExecuteSubPlans(distributedPlan); ExecuteSubPlans(distributedPlan);
foreach(taskCell, taskList) if (list_length(taskList) <= 1 ||
IsMultiRowInsert(workerJob->jobQuery) ||
MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
Task *task = (Task *) lfirst(taskCell); parallelExecution = false;
}
/* if (parallelExecution)
* Result is expected for SELECT ... FOR UPDATE queries as well. {
*/ RouterMultiModifyExecScan(node);
executorState->es_processed += }
ExecuteSingleModifyTask(scanState, task, operation, else
alwaysThrowErrorOnFailure, {
hasReturning || task->relationRowLockList != NIL); RouterSequentialModifyExecScan(node);
} }
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
@ -615,6 +583,75 @@ RouterSequentialModifyExecScan(CustomScanState *node)
} }
/*
* RouterSequentialModifyExecScan executes 0 or more modifications on a
* distributed table sequentially and stores them in custom scan's tuple
* store. Note that we also use this path for SELECT ... FOR UPDATE queries.
*/
static void
RouterSequentialModifyExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1;
EState *executorState = scanState->customScanState.ss.ps.state;
bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool alwaysThrowErrorOnFailure = false;
CmdType operation = scanState->distributedPlan->operation;
Assert(!scanState->finishedRemoteScan);
/*
* We could naturally handle function-based transactions (i.e. those using
* PL/pgSQL or similar) by checking the type of queryDesc->dest, but some
* customers already use functions that touch multiple shards from within
* a function, so we'll ignore functions for now.
*/
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC)
{
BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always set alwaysThrowErrorOnFailure since we
* absolutely want to avoid marking any placements invalid.
*
* We also cannot handle failures when there is RETURNING and there are more
* than one task to execute.
*/
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
alwaysThrowErrorOnFailure = true;
}
else if (multipleTasks && hasReturning)
{
alwaysThrowErrorOnFailure = true;
}
}
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
bool expectResults = (hasReturning || task->relationRowLockList != NIL);
executorState->es_processed +=
ExecuteSingleModifyTask(scanState, task, operation,
alwaysThrowErrorOnFailure, expectResults);
}
}
/* /*
* TaskListRequires2PC determines whether the given task list requires 2PC * TaskListRequires2PC determines whether the given task list requires 2PC
* because the tasks provided operates on a reference table or there are multiple * because the tasks provided operates on a reference table or there are multiple
@ -667,31 +704,20 @@ TaskListRequires2PC(List *taskList)
/* /*
* RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves * RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in custom scan's tuple store. * the results and, if RETURNING is used, stores them in custom scan's tuple store.
* Then, it returns tuples one by one from this tuple store.
*/ */
TupleTableSlot * static void
RouterMultiModifyExecScan(CustomScanState *node) RouterMultiModifyExecScan(CustomScanState *node)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
bool hasReturning = distributedPlan->hasReturning;
bool isModificationQuery = true;
if (!scanState->finishedRemoteScan) Assert(!scanState->finishedRemoteScan);
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
bool hasReturning = distributedPlan->hasReturning;
bool isModificationQuery = true;
ExecuteSubPlans(distributedPlan); ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
} }

View File

@ -37,9 +37,8 @@ extern bool AllModificationsCommutative;
extern bool EnableDeadlockPrevention; extern bool EnableDeadlockPrevention;
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RouterSequentialModifyExecScan(CustomScanState *node);
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,

View File

@ -1143,10 +1143,155 @@ DEBUG: verifying table "test_table_2"
SET LOCAL client_min_messages TO ERROR; SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_2, test_table_1; DROP TABLE test_table_2, test_table_1;
COMMIT; COMMIT;
-- make sure that modifications to reference tables in a CTE can
-- set the mode to sequential for the next operations
CREATE TABLE reference_table(id int PRIMARY KEY);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table"
DEBUG: building index "reference_table_pkey" on table "reference_table"
SELECT create_reference_table('reference_table');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
create_reference_table
------------------------
(1 row)
CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table"
DEBUG: building index "distributed_table_pkey" on table "distributed_table"
SELECT create_distributed_table('distributed_table', 'id');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
create_distributed_table
--------------------------
(1 row)
ALTER TABLE
distributed_table
ADD CONSTRAINT
fkey_delete FOREIGN KEY(value_1)
REFERENCES
reference_table(id) ON DELETE CASCADE;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- this query returns 100 rows in Postgres, but not in Citus
-- see https://github.com/citusdata/citus_docs/issues/664 for the discussion
WITH t1 AS (DELETE FROM reference_table RETURNING id)
DELETE FROM distributed_table USING t1 WHERE value_1 = t1.id RETURNING *;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 92_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 92 query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.distributed_table USING (SELECT intermediate_result.id FROM read_intermediate_result('92_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1 WHERE (distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) RETURNING distributed_table.id, distributed_table.value_1, t1.id
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
id | value_1 | id
----+---------+----
(0 rows)
-- load some more data for one more test with real-time selects
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- this query returns 100 rows in Postgres, but not in Citus
-- see https://github.com/citusdata/citus_docs/issues/664 for the discussion
WITH t1 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1 WHERE value_1 = t1.id;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 96_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 96 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('96_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1 WHERE (distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id)
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
count
-------
0
(1 row)
-- this query should fail since we first to a parallel access to a distributed table
-- with t1, and then access to t2
WITH t1 AS (DELETE FROM distributed_table RETURNING id),
t2 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1, t2 WHERE value_1 = t1.id AND value_1 = t2.id;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 98_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: generating subplan 98_2 for CTE t2: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 98 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('98_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1, (SELECT intermediate_result.id FROM read_intermediate_result('98_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t2 WHERE ((distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) AND (distributed_table.value_1 OPERATOR(pg_catalog.=) t2.id))
ERROR: cannot execute DML on reference relation "reference_table" because there was a parallel DML access to distributed relation "distributed_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
-- similarly this should fail since we first access to a distributed
-- table via t1, and then access to the reference table in the main query
WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 101_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: Plan 101 query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
ERROR: cannot execute DML on reference relation "reference_table" because there was a parallel DML access to distributed relation "distributed_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
-- finally, make sure that we can execute the same queries
-- in the sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH t1 AS (DELETE FROM distributed_table RETURNING id),
t2 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1, t2 WHERE value_1 = t1.id AND value_1 = t2.id;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 103_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: generating subplan 103_2 for CTE t2: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 103 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('103_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1, (SELECT intermediate_result.id FROM read_intermediate_result('103_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t2 WHERE ((distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) AND (distributed_table.value_1 OPERATOR(pg_catalog.=) t2.id))
count
-------
0
(1 row)
ROLLBACK;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 106_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: Plan 106 query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
id
----
(0 rows)
ROLLBACK;
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA test_fkey_to_ref_in_tx CASCADE; DROP SCHEMA test_fkey_to_ref_in_tx CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table referece_table DETAIL: drop cascades to table referece_table
drop cascades to table on_update_fkey_table drop cascades to table on_update_fkey_table
drop cascades to table unrelated_dist_table drop cascades to table unrelated_dist_table
drop cascades to table reference_table
drop cascades to table distributed_table
SET search_path TO public; SET search_path TO public;

View File

@ -1143,10 +1143,155 @@ DEBUG: verifying table "test_table_2"
SET LOCAL client_min_messages TO ERROR; SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_2, test_table_1; DROP TABLE test_table_2, test_table_1;
COMMIT; COMMIT;
-- make sure that modifications to reference tables in a CTE can
-- set the mode to sequential for the next operations
CREATE TABLE reference_table(id int PRIMARY KEY);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table"
DEBUG: building index "reference_table_pkey" on table "reference_table" serially
SELECT create_reference_table('reference_table');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
create_reference_table
------------------------
(1 row)
CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table"
DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially
SELECT create_distributed_table('distributed_table', 'id');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
create_distributed_table
--------------------------
(1 row)
ALTER TABLE
distributed_table
ADD CONSTRAINT
fkey_delete FOREIGN KEY(value_1)
REFERENCES
reference_table(id) ON DELETE CASCADE;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- this query returns 100 rows in Postgres, but not in Citus
-- see https://github.com/citusdata/citus_docs/issues/664 for the discussion
WITH t1 AS (DELETE FROM reference_table RETURNING id)
DELETE FROM distributed_table USING t1 WHERE value_1 = t1.id RETURNING *;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 92_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 92 query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.distributed_table USING (SELECT intermediate_result.id FROM read_intermediate_result('92_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1 WHERE (distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) RETURNING distributed_table.id, distributed_table.value_1, t1.id
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
id | value_1 | id
----+---------+----
(0 rows)
-- load some more data for one more test with real-time selects
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- this query returns 100 rows in Postgres, but not in Citus
-- see https://github.com/citusdata/citus_docs/issues/664 for the discussion
WITH t1 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1 WHERE value_1 = t1.id;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 96_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 96 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('96_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1 WHERE (distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id)
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
count
-------
0
(1 row)
-- this query should fail since we first to a parallel access to a distributed table
-- with t1, and then access to t2
WITH t1 AS (DELETE FROM distributed_table RETURNING id),
t2 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1, t2 WHERE value_1 = t1.id AND value_1 = t2.id;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 98_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: generating subplan 98_2 for CTE t2: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 98 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('98_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1, (SELECT intermediate_result.id FROM read_intermediate_result('98_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t2 WHERE ((distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) AND (distributed_table.value_1 OPERATOR(pg_catalog.=) t2.id))
ERROR: cannot execute DML on reference relation "reference_table" because there was a parallel DML access to distributed relation "distributed_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
-- similarly this should fail since we first access to a distributed
-- table via t1, and then access to the reference table in the main query
WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 101_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: Plan 101 query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
ERROR: cannot execute DML on reference relation "reference_table" because there was a parallel DML access to distributed relation "distributed_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
-- finally, make sure that we can execute the same queries
-- in the sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH t1 AS (DELETE FROM distributed_table RETURNING id),
t2 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1, t2 WHERE value_1 = t1.id AND value_1 = t2.id;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 103_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: generating subplan 103_2 for CTE t2: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan 103 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('103_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1, (SELECT intermediate_result.id FROM read_intermediate_result('103_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t2 WHERE ((distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) AND (distributed_table.value_1 OPERATOR(pg_catalog.=) t2.id))
count
-------
0
(1 row)
ROLLBACK;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 106_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: Plan 106 query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
id
----
(0 rows)
ROLLBACK;
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA test_fkey_to_ref_in_tx CASCADE; DROP SCHEMA test_fkey_to_ref_in_tx CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table referece_table DETAIL: drop cascades to table referece_table
drop cascades to table on_update_fkey_table drop cascades to table on_update_fkey_table
drop cascades to table unrelated_dist_table drop cascades to table unrelated_dist_table
drop cascades to table reference_table
drop cascades to table distributed_table
SET search_path TO public; SET search_path TO public;

View File

@ -572,6 +572,69 @@ BEGIN;
DROP TABLE test_table_2, test_table_1; DROP TABLE test_table_2, test_table_1;
COMMIT; COMMIT;
-- make sure that modifications to reference tables in a CTE can
-- set the mode to sequential for the next operations
CREATE TABLE reference_table(id int PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('distributed_table', 'id');
ALTER TABLE
distributed_table
ADD CONSTRAINT
fkey_delete FOREIGN KEY(value_1)
REFERENCES
reference_table(id) ON DELETE CASCADE;
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
-- this query returns 100 rows in Postgres, but not in Citus
-- see https://github.com/citusdata/citus_docs/issues/664 for the discussion
WITH t1 AS (DELETE FROM reference_table RETURNING id)
DELETE FROM distributed_table USING t1 WHERE value_1 = t1.id RETURNING *;
-- load some more data for one more test with real-time selects
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
-- this query returns 100 rows in Postgres, but not in Citus
-- see https://github.com/citusdata/citus_docs/issues/664 for the discussion
WITH t1 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1 WHERE value_1 = t1.id;
-- this query should fail since we first to a parallel access to a distributed table
-- with t1, and then access to t2
WITH t1 AS (DELETE FROM distributed_table RETURNING id),
t2 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1, t2 WHERE value_1 = t1.id AND value_1 = t2.id;
-- similarly this should fail since we first access to a distributed
-- table via t1, and then access to the reference table in the main query
WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
-- finally, make sure that we can execute the same queries
-- in the sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH t1 AS (DELETE FROM distributed_table RETURNING id),
t2 AS (DELETE FROM reference_table RETURNING id)
SELECT count(*) FROM distributed_table, t1, t2 WHERE value_1 = t1.id AND value_1 = t2.id;
ROLLBACK;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
ROLLBACK;
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA test_fkey_to_ref_in_tx CASCADE; DROP SCHEMA test_fkey_to_ref_in_tx CASCADE;