Enforce foreign key restrictions inside transaction blocks

When a hash distributed table have a foreign key to a reference
table, there are few restrictions we have to apply in order to
prevent distributed deadlocks or reading wrong results.

The necessity to apply the restrictions arise from cascading
nature of foreign keys. When a foreign key on a reference table
cascades to a distributed table, a single operation over a single
connection can acquire locks on multiple shards of the distributed
table. Thus, any parallel operation on that distributed table, in the
same transaction should not open parallel connections to the shards.
Otherwise, we'd either end-up with a self-distributed deadlock or
read wrong results.

As briefly described above, the restrictions that we apply is done
by tracking the distributed/reference relation accesses inside
transaction blocks, and act accordingly when necessary.

The two main rules are as follows:
   - Whenever a parallel distributed relation access conflicts
     with a consecutive reference relation access, Citus errors
     out
   - Whenever a reference relation access is followed by a
     conflicting parallel relation access, the execution mode
     is switched to sequential mode.

There are also some other notes to mention:
   - If the user does SET LOCAL citus.multi_shard_modify_mode
     TO 'sequential';, all the queries should simply work with
     using one connection per worker and sequentially executing
     the commands. That's obviously a slower approach than Citus'
     usual parallel execution. However, we've at least have a way
     to run all commands successfully.

   - If an unrelated parallel query executed on any distributed
     table, we cannot switch to sequential mode. Because, the essense
     of sequential mode is using one connection per worker. However,
     in the presence of a parallel connection, the connection manager
     picks those connections to execute the commands. That contradicts
     with our purpose, thus we error out.

   - COPY to a distributed table cannot be executed in sequential mode.
     Thus, if we switch to sequential mode and COPY is executed, the
     operation fails and there is currently no way of implementing that.
     Note that, when the local table is not empty and create_distributed_table
     is used, citus uses COPY internally. Thus, in those cases,
     create_distributed_table() will also fail.

   - There is a GUC called citus.enforce_foreign_key_restrictions
     to disable all the checks. We added that GUC since the restrictions
     we apply is sometimes a bit more restrictive than its necessary.
     The user might want to relax those. Similarly, if you don't have
     CASCADEing reference tables, you might consider disabling all the
     checks.
pull/2240/head
Onder Kalaci 2018-06-27 11:36:58 +03:00 committed by mehmet furkan şahin
parent 6be6911ed9
commit d83be3a33f
13 changed files with 2327 additions and 34 deletions

View File

@ -47,6 +47,7 @@
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
@ -1120,6 +1121,26 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
"case, try distributing local tables when they "
"are empty.")));
}
else if (shouldRunSequential && ParallelQueryExecutedInTransaction())
{
char *relationName = get_rel_name(relationId);
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
"transaction because it has a foreign key to "
"a reference table", relationName),
errdetail("If a hash distributed table has a foreign key "
"to a reference table, it has to be created "
"in sequential mode before any parallel commands "
"have been executed in the same transaction"),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
else if (shouldRunSequential)
{
return false;

View File

@ -2324,6 +2324,9 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
/* mark as multi shard to skip doing the same thing over and over */
copyDest->multiShardCopy = true;
/* error out of conflicting COPY */
CheckConflictingParallelCopyAccesses(relationId);
/* when we see multiple shard connections, we mark COPY as parallel modify */
RecordParallelModifyAccess(relationId);
}

View File

@ -49,6 +49,7 @@
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
#include "distributed/pg_dist_partition.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/transmit.h"
@ -4042,5 +4043,29 @@ ShouldExecuteAlterTableSequentially(Oid relationId, AlterTableCmd *command)
}
}
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers for
* the distributed tables, thus contradicting our purpose of using
* sequential mode.
*/
if (executeSequentially && IsDistributedTable(relationId) &&
PartitionMethod(relationId) != DISTRIBUTE_BY_NONE &&
ParallelQueryExecutedInTransaction())
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("cannot modify table \"%s\" because there "
"was a parallel operation on a distributed table"
"in the transaction", relationName),
errdetail("When there is a foreign key to a reference "
"table, Citus needs to perform all operations "
"over a single connection per node to ensure "
"consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
return executeSequentially;
}

View File

@ -38,6 +38,7 @@
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_stats.h"
#include "distributed/remote_commands.h"
@ -401,6 +402,20 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enforce_foreign_key_restrictions",
gettext_noop("Enforce restrictions while querying distributed/reference "
"tables with foreign keys"),
gettext_noop("When enabled, cascading modifications from reference tables "
"to distributed tables are traced and acted accordingly "
"to avoid creating distributed deadlocks and ensure correctness."),
&EnforceForeignKeyRestrictions,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Enables supported subquery pushdown to workers."),

View File

@ -20,14 +20,27 @@
#include "access/xact.h"
#include "distributed/colocation_utils.h"
#include "distributed/hash_helpers.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/relation_access_tracking.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
/* Config variables managed via guc.c */
bool EnforceForeignKeyRestrictions = true;
#define PARALLEL_MODE_FLAG_OFFSET 3
#define PARALLEL_ACCESS_MASK (int) (0 | \
(1 << (PLACEMENT_ACCESS_SELECT + \
PARALLEL_MODE_FLAG_OFFSET)) | \
(1 << (PLACEMENT_ACCESS_DML + \
PARALLEL_MODE_FLAG_OFFSET)) | \
(1 << (PLACEMENT_ACCESS_DDL + \
PARALLEL_MODE_FLAG_OFFSET)))
/*
* Hash table mapping relations to the
@ -66,6 +79,7 @@ typedef struct RelationAccessHashEntry
static HTAB *RelationAccessHash;
/* functions related to access recording */
static void RecordRelationAccess(Oid relationId, ShardPlacementAccessType accessType);
static void RecordPlacementAccessToCache(Oid relationId,
ShardPlacementAccessType accessType);
@ -76,6 +90,26 @@ static void RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessTyp
static void RecordParallelRelationAccessToCache(Oid relationId,
ShardPlacementAccessType placementAccess);
/* functions related to access conflict checks */
static char * PlacementAccessTypeToText(ShardPlacementAccessType accessType);
static void CheckConflictingRelationAccesses(Oid relationId,
ShardPlacementAccessType accessType);
static bool HoldsConflictingLockWithReferencingRelations(Oid relationId,
ShardPlacementAccessType
placementAccess,
Oid *conflictingRelationId,
ShardPlacementAccessType *
conflictingAccessMode);
static void CheckConflictingParallelRelationAccesses(Oid relationId,
ShardPlacementAccessType
accessType);
static bool HoldsConflictingLockWithReferencedRelations(Oid relationId,
ShardPlacementAccessType
placementAccess,
Oid *conflictingRelationId,
ShardPlacementAccessType *
conflictingAccessMode);
/*
* Empty RelationAccessHash, without destroying the hash table itself.
@ -131,6 +165,40 @@ AssociatePlacementAccessWithRelation(ShardPlacement *placement,
}
/*
* PlacementAccessTypeToText converts ShardPlacementAccessType to
* text representation.
*/
static char *
PlacementAccessTypeToText(ShardPlacementAccessType accessType)
{
switch (accessType)
{
case PLACEMENT_ACCESS_SELECT:
{
return "SELECT";
break;
}
case PLACEMENT_ACCESS_DML:
{
return "DML";
}
case PLACEMENT_ACCESS_DDL:
{
return "DDL";
}
default:
{
return "None";
break;
}
}
}
/*
* RecordRelationAccess associates the access to the distributed relation. The
* function takes partitioned relations into account as well.
@ -142,6 +210,9 @@ AssociatePlacementAccessWithRelation(ShardPlacement *placement,
static void
RecordRelationAccess(Oid relationId, ShardPlacementAccessType accessType)
{
/* make sure that this is not a conflicting access */
CheckConflictingRelationAccesses(relationId, accessType);
/*
* If a relation is partitioned, record accesses to all of its partitions as well.
* We prefer to use PartitionedTableNoLock() because at this point the necessary
@ -374,6 +445,9 @@ RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementA
return;
}
/* act accordingly if it's a conflicting access */
CheckConflictingParallelRelationAccesses(relationId, placementAccess);
/*
* If a relation is partitioned, record accesses to all of its partitions as well.
* We prefer to use PartitionedTableNoLock() because at this point the necessary
@ -434,6 +508,40 @@ RecordParallelRelationAccessToCache(Oid relationId,
}
/*
* ParallelQueryExecutedInTransaction returns true if any parallel query
* is executed in the current transaction.
*/
bool
ParallelQueryExecutedInTransaction(void)
{
HASH_SEQ_STATUS status;
RelationAccessHashEntry *hashEntry;
if (!ShouldRecordRelationAccess() || RelationAccessHash == NULL)
{
return false;
}
hash_seq_init(&status, RelationAccessHash);
hashEntry = (RelationAccessHashEntry *) hash_seq_search(&status);
while (hashEntry != NULL)
{
int relationAccessMode = hashEntry->relationAccessMode;
if ((relationAccessMode & PARALLEL_ACCESS_MASK))
{
hash_seq_term(&status);
return true;
}
hashEntry = (RelationAccessHashEntry *) hash_seq_search(&status);
}
return false;
}
/*
* GetRelationSelectAccessMode is a wrapper around GetRelationAccessMode.
*/
@ -524,10 +632,414 @@ GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType)
bool
ShouldRecordRelationAccess()
{
if (IsTransactionBlock() || InCoordinatedTransaction())
if (EnforceForeignKeyRestrictions &&
(IsTransactionBlock() || InCoordinatedTransaction()))
{
return true;
}
return false;
}
/*
* CheckConflictingRelationAccesses is mostly a wrapper around
* HoldsConflictingLockWithReferencingRelations(). We're only interested in accesses
* to reference tables that are referenced via a foreign constraint by a
* hash distributed tables.
*/
static void
CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType accessType)
{
DistTableCacheEntry *cacheEntry = NULL;
Oid conflictingReferencingRelationId = InvalidOid;
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
if (!EnforceForeignKeyRestrictions || !IsDistributedTable(relationId))
{
return;
}
cacheEntry = DistributedTableCacheEntry(relationId);
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
cacheEntry->referencingRelationsViaForeignKey != NIL))
{
return;
}
if (HoldsConflictingLockWithReferencingRelations(relationId, accessType,
&conflictingReferencingRelationId,
&conflictingAccessType))
{
char *relationName = get_rel_name(relationId);
char *conflictingRelationName = get_rel_name(conflictingReferencingRelationId);
char *accessTypeText = PlacementAccessTypeToText(accessType);
char *conflictingAccessTypeText =
PlacementAccessTypeToText(conflictingAccessType);
ereport(ERROR, (errmsg("cannot execute %s on reference relation \"%s\" because "
"there was a parallel %s access to distributed relation "
"\"%s\" in the same transaction",
accessTypeText, relationName, conflictingAccessTypeText,
conflictingRelationName),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
else if (cacheEntry->referencingRelationsViaForeignKey != NIL &&
accessType > PLACEMENT_ACCESS_SELECT)
{
char *relationName = get_rel_name(relationId);
if (ParallelQueryExecutedInTransaction())
{
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot modify reference table \"%s\" because there "
"was a parallel operation on a distributed table",
relationName),
errdetail("When there is a foreign key to a reference "
"table, Citus needs to perform all operations "
"over a single connection per node to ensure "
"consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
else if (MultiShardConnectionType == PARALLEL_CONNECTION)
{
/*
* We can still continue with multi-shard queries in sequential mode, so
* set it.
*/
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"Reference relation \"%s\" is modified, which might lead "
"to data inconsistencies or distributed deadlocks via "
"parallel accesses to hash distributed relations due to "
"foreign keys. Any parallel modification to "
"those hash distributed relations in the same "
"transaction can only be executed in sequential query "
"execution mode", relationName)));
/*
* Switching to sequential mode is admittedly confusing and, could be useless
* and less performant in some cases. However, if we do not switch to
* sequential mode at this point, we'd loose the opportunity to do so
* later when a parallel query is executed on the hash distributed relations
* that are referencing this reference table.
*/
SetLocalMultiShardModifyModeToSequential();
}
}
}
/*
* CheckConflictingParallelRelationAccesses is mostly a wrapper around
* HoldsConflictingLockWithReferencedRelations(). We're only interested in parallel
* accesses to distributed tables that refers reference tables via foreign constraint.
*
*/
static void
CheckConflictingParallelRelationAccesses(Oid relationId, ShardPlacementAccessType
accessType)
{
DistTableCacheEntry *cacheEntry = NULL;
Oid conflictingReferencingRelationId = InvalidOid;
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
if (!EnforceForeignKeyRestrictions || !IsDistributedTable(relationId))
{
return;
}
cacheEntry = DistributedTableCacheEntry(relationId);
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->referencedRelationsViaForeignKey != NIL))
{
return;
}
if (MultiShardConnectionType == PARALLEL_CONNECTION &&
HoldsConflictingLockWithReferencedRelations(relationId, accessType,
&conflictingReferencingRelationId,
&conflictingAccessType))
{
char *relationName = get_rel_name(relationId);
char *conflictingRelationName = get_rel_name(conflictingReferencingRelationId);
char *accessTypeText = PlacementAccessTypeToText(accessType);
char *conflictingAccessTypeText =
PlacementAccessTypeToText(conflictingAccessType);
if (ParallelQueryExecutedInTransaction())
{
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot execute parallel %s on relation \"%s\" "
"after %s command on reference relation "
"\"%s\" because there is a foreign key between "
"them and \"%s\" has been accessed in this transaction",
accessTypeText, relationName,
conflictingAccessTypeText, conflictingRelationName,
conflictingRelationName),
errdetail("When there is a foreign key to a reference "
"table, Citus needs to perform all operations "
"over a single connection per node to ensure "
"consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
else
{
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("cannot execute parallel %s on relation \"%s\" "
"after %s command on reference relation "
"\"%s\" because there is a foreign key between "
"them and \"%s\" has been accessed in this transaction",
accessTypeText, relationName,
conflictingAccessTypeText, conflictingRelationName,
conflictingRelationName)));
SetLocalMultiShardModifyModeToSequential();
}
}
}
/*
* CheckConflictingParallelCopyAccesses is mostly a wrapper around
* HoldsConflictingLockWithReferencedRelations(). We're only interested in parallel
* accesses to distributed tables that refers reference tables via foreign constraint.
* Since COPY cannot be used in sequential mode, we're erroring out.
*/
void
CheckConflictingParallelCopyAccesses(Oid relationId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
Oid conflictingReferencingRelationId = InvalidOid;
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->referencedRelationsViaForeignKey != NIL))
{
return;
}
if (HoldsConflictingLockWithReferencedRelations(relationId, PLACEMENT_ACCESS_DML,
&conflictingReferencingRelationId,
&conflictingAccessType))
{
char *relationName = get_rel_name(relationId);
char *conflictingRelationName = get_rel_name(conflictingReferencingRelationId);
char *conflictingAccessTypeText =
PlacementAccessTypeToText(conflictingAccessType);
ereport(ERROR, (errmsg("cannot execute parallel COPY on relation \"%s\" "
"after %s command on reference relation "
"\"%s\" because there is a foreign key between "
"them and \"%s\" has been modified in this transaction",
relationName, conflictingAccessTypeText,
conflictingRelationName, conflictingRelationName),
errdetail("COPY to a distributed table uses a separate set of "
"connections which will not be able to see the "
"uncommitted changes to the reference table."),
errhint("Perform the COPY in a separate transaction.")));
}
}
/*
* HoldsConflictingLockWithReferencedRelations returns true if the input relationId is a
* hash distributed table and it holds any conflicting locks with the reference tables that
* the distributed table has a foreign key to the reference table.
*/
static bool
HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccessType
placementAccess,
Oid *conflictingRelationId,
ShardPlacementAccessType *
conflictingAccessMode)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
ListCell *referencedRelationCell = NULL;
foreach(referencedRelationCell, cacheEntry->referencedRelationsViaForeignKey)
{
Oid referencedRelation = lfirst_oid(referencedRelationCell);
RelationAccessMode selectMode = RELATION_NOT_ACCESSED;
RelationAccessMode dmlMode = RELATION_NOT_ACCESSED;
RelationAccessMode ddlMode = RELATION_NOT_ACCESSED;
/* we're only interested in foreign keys to reference tables */
if (PartitionMethod(referencedRelation) != DISTRIBUTE_BY_NONE)
{
continue;
}
/*
* A select on a reference table could conflict with a DDL
* on a distributed table.
*/
selectMode = GetRelationSelectAccessMode(referencedRelation);
if (placementAccess == PLACEMENT_ACCESS_DDL &&
selectMode != RELATION_NOT_ACCESSED)
{
*conflictingRelationId = referencedRelation;
*conflictingAccessMode = PLACEMENT_ACCESS_SELECT;
return true;
}
/*
* Both DML and DDL operations on a reference table conflicts with
* any parallel operation on distributed tables.
*/
dmlMode = GetRelationDMLAccessMode(referencedRelation);
if (dmlMode != RELATION_NOT_ACCESSED)
{
*conflictingRelationId = referencedRelation;
*conflictingAccessMode = PLACEMENT_ACCESS_DML;
return true;
}
ddlMode = GetRelationDDLAccessMode(referencedRelation);
if (ddlMode != RELATION_NOT_ACCESSED)
{
*conflictingRelationId = referencedRelation;
*conflictingAccessMode = PLACEMENT_ACCESS_DDL;
return true;
}
}
return false;
}
/*
* HoldsConflictingLockWithReferencingRelations returns true when the input relationId is a
* reference table and it holds any conflicting locks with the distributed tables where
* the distributed table has a foreign key to the reference table.
*
* If returns true, the referencing relation and conflictingAccessMode are also set.
*/
static bool
HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAccessType
placementAccess, Oid *conflictingRelationId,
ShardPlacementAccessType *
conflictingAccessMode)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
ListCell *referencingRelationCell = NULL;
bool holdsConflictingLocks = false;
Assert(PartitionMethod(relationId) == DISTRIBUTE_BY_NONE);
foreach(referencingRelationCell, cacheEntry->referencingRelationsViaForeignKey)
{
Oid referencingRelation = lfirst_oid(referencingRelationCell);
/*
* We're only interested in foreign keys to reference tables from
* hash distributed tables.
*/
if (!IsDistributedTable(referencingRelation) ||
PartitionMethod(referencingRelation) != DISTRIBUTE_BY_HASH)
{
continue;
}
/*
* Rules that we apply:
* - SELECT on a reference might table conflict with
* a previous parallel DDL on a distributed table
* - DML on a reference table might conflict with
* a previous parallel DML or DDL on a distributed
* table
* - DDL on a reference table might conflict with
* a parellel SELECT, DML or DDL on a distributed
* table
*/
if (placementAccess == PLACEMENT_ACCESS_SELECT)
{
RelationAccessMode ddlMode = GetRelationDDLAccessMode(referencingRelation);
if (ddlMode == RELATION_PARALLEL_ACCESSED)
{
/* SELECT on a distributed table conflicts with DDL / TRUNCATE */
holdsConflictingLocks = true;
*conflictingAccessMode = PLACEMENT_ACCESS_DDL;
}
}
else if (placementAccess == PLACEMENT_ACCESS_DML)
{
RelationAccessMode ddlMode = RELATION_NOT_ACCESSED;
RelationAccessMode dmlMode = GetRelationDMLAccessMode(referencingRelation);
if (dmlMode == RELATION_PARALLEL_ACCESSED)
{
holdsConflictingLocks = true;
*conflictingAccessMode = PLACEMENT_ACCESS_DML;
}
ddlMode = GetRelationDDLAccessMode(referencingRelation);
if (ddlMode == RELATION_PARALLEL_ACCESSED)
{
/* SELECT on a distributed table conflicts with DDL / TRUNCATE */
holdsConflictingLocks = true;
*conflictingAccessMode = PLACEMENT_ACCESS_DDL;
}
}
else if (placementAccess == PLACEMENT_ACCESS_DDL)
{
RelationAccessMode selectMode = RELATION_NOT_ACCESSED;
RelationAccessMode ddlMode = RELATION_NOT_ACCESSED;
RelationAccessMode dmlMode = RELATION_NOT_ACCESSED;
selectMode = GetRelationSelectAccessMode(referencingRelation);
if (selectMode == RELATION_PARALLEL_ACCESSED)
{
holdsConflictingLocks = true;
*conflictingAccessMode = PLACEMENT_ACCESS_SELECT;
}
dmlMode = GetRelationDMLAccessMode(referencingRelation);
if (dmlMode == RELATION_PARALLEL_ACCESSED)
{
holdsConflictingLocks = true;
*conflictingAccessMode = PLACEMENT_ACCESS_DML;
}
ddlMode = GetRelationDDLAccessMode(referencingRelation);
if (ddlMode == RELATION_PARALLEL_ACCESSED)
{
holdsConflictingLocks = true;
*conflictingAccessMode = PLACEMENT_ACCESS_DDL;
}
}
if (holdsConflictingLocks)
{
*conflictingRelationId = referencingRelation;
return true;
}
}
return false;
}

View File

@ -2988,7 +2988,7 @@ InvalidateForeignKeyGraph(void)
{
CitusInvalidateRelcacheByRelid(DistColocationRelationId());
/* bump command counter, to force invalidation to take effect */
/* bump command counter to force invalidation to take effect */
CommandCounterIncrement();
}

View File

@ -13,6 +13,10 @@
#include "distributed/multi_physical_planner.h" /* access Task struct */
#include "distributed/placement_connection.h"
/* Config variables managed via guc.c */
extern bool EnforceForeignKeyRestrictions;
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
@ -37,6 +41,8 @@ extern RelationAccessMode GetRelationDDLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationDMLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationSelectAccessMode(Oid relationId);
extern bool ShouldRecordRelationAccess(void);
extern void CheckConflictingParallelCopyAccesses(Oid relationId);
extern bool ParallelQueryExecutedInTransaction(void);
#endif /* RELATION_ACCESS_TRACKING_H_ */

File diff suppressed because it is too large Load Diff

View File

@ -1267,7 +1267,6 @@ DETAIL: drop cascades to constraint test_table_2_id_fkey on table test_table_2
drop cascades to constraint test_table_3_value_1_fkey on table test_table_3
ROLLBACK;
-- create_reference_table, create_distributed_table and ALTER TABLE in the same transaction
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
@ -1284,12 +1283,13 @@ BEGIN;
(1 row)
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
ERROR: cannot perform query with placements that were modified over multiple connections
ERROR: cannot modify table "test_table_2" because there was a parallel operation on a distributed tablein the transaction
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
DROP TABLE test_table_1, test_table_2;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- the order of create_reference_table and create_distributed_table is changed
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_1', 'id');
@ -1306,7 +1306,9 @@ BEGIN;
(1 row)
ALTER TABLE test_table_1 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_2(id);
ERROR: cannot perform query with placements that were modified over multiple connections
ERROR: cannot modify table "test_table_1" because there was a parallel operation on a distributed tablein the transaction
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
DROP TABLE test_table_2 CASCADE;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
@ -1331,7 +1333,6 @@ HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', t
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- make sure that other DDLs/DMLs also work fine
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
@ -1349,9 +1350,7 @@ BEGIN;
CREATE INDEX i1 ON test_table_1(id);
ALTER TABLE test_table_2 ADD CONSTRAINT check_val CHECK (id > 0);
ERROR: cannot establish a new connection for placement 7000388, since DDL has been executed on a connection that is in use
DROP TABLE test_table_2, test_table_1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- The following tests check if the DDLs affecting foreign keys work as expected
-- check if we can drop the foreign constraint
@ -1384,7 +1383,6 @@ SELECT count(*) FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_reference_tab
DROP TABLE test_table_1, test_table_2;
-- check if we can drop the foreign constraint in a transaction right after ADD CONSTRAINT
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
@ -1401,7 +1399,9 @@ BEGIN;
(1 row)
ALTER TABLE test_table_2 ADD CONSTRAINT foreign_key FOREIGN KEY(value_1) REFERENCES test_table_1(id);
ERROR: cannot perform query with placements that were modified over multiple connections
ERROR: cannot modify table "test_table_2" because there was a parallel operation on a distributed tablein the transaction
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ALTER TABLE test_table_2 DROP CONSTRAINT test_table_2_value_1_fkey;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
@ -1770,7 +1770,6 @@ SELECT * FROM test_table_1;
DROP TABLE test_table_1, test_table_2;
-- check if we successfuly set multi_shard_modify_mode to sequential after sequentially running DDLs
-- in transaction since the upcoming DDLs need to run sequentially.
-- FIXME: fails for now
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
CREATE TABLE test_table_3(id int PRIMARY KEY, value_1 int);
@ -1795,11 +1794,9 @@ SELECT create_distributed_table('test_table_3', 'id');
BEGIN;
ALTER TABLE test_table_2 ADD CONSTRAINT fkey FOREIGN KEY (value_1) REFERENCES test_table_1(id);
ALTER TABLE test_table_3 ADD COLUMN test_column int;
ERROR: cannot establish a new connection for placement 7000556, since DDL has been executed on a connection that is in use
ALTER TABLE test_table_1 DROP COLUMN id CASCADE;
ERROR: current transaction is aborted, commands ignored until end of transaction block
NOTICE: drop cascades to constraint fkey on table test_table_2
ALTER TABLE test_table_1 ADD COLUMN id int;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
DROP TABLE test_table_1, test_table_2, test_table_3;
-- NOTE: Postgres does not support foreign keys on partitioned tables currently.

View File

@ -1267,7 +1267,6 @@ DETAIL: drop cascades to constraint test_table_2_id_fkey on table test_table_2
drop cascades to constraint test_table_3_value_1_fkey on table test_table_3
ROLLBACK;
-- create_reference_table, create_distributed_table and ALTER TABLE in the same transaction
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
@ -1284,12 +1283,13 @@ BEGIN;
(1 row)
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
ERROR: cannot perform query with placements that were modified over multiple connections
ERROR: cannot modify table "test_table_2" because there was a parallel operation on a distributed tablein the transaction
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
DROP TABLE test_table_1, test_table_2;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- the order of create_reference_table and create_distributed_table is changed
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_1', 'id');
@ -1306,7 +1306,9 @@ BEGIN;
(1 row)
ALTER TABLE test_table_1 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_2(id);
ERROR: cannot perform query with placements that were modified over multiple connections
ERROR: cannot modify table "test_table_1" because there was a parallel operation on a distributed tablein the transaction
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
DROP TABLE test_table_2 CASCADE;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
@ -1331,7 +1333,6 @@ HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', t
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- make sure that other DDLs/DMLs also work fine
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
@ -1349,9 +1350,7 @@ BEGIN;
CREATE INDEX i1 ON test_table_1(id);
ALTER TABLE test_table_2 ADD CONSTRAINT check_val CHECK (id > 0);
ERROR: cannot establish a new connection for placement 7000388, since DDL has been executed on a connection that is in use
DROP TABLE test_table_2, test_table_1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- The following tests check if the DDLs affecting foreign keys work as expected
-- check if we can drop the foreign constraint
@ -1384,7 +1383,6 @@ SELECT count(*) FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_reference_tab
DROP TABLE test_table_1, test_table_2;
-- check if we can drop the foreign constraint in a transaction right after ADD CONSTRAINT
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
@ -1401,7 +1399,9 @@ BEGIN;
(1 row)
ALTER TABLE test_table_2 ADD CONSTRAINT foreign_key FOREIGN KEY(value_1) REFERENCES test_table_1(id);
ERROR: cannot perform query with placements that were modified over multiple connections
ERROR: cannot modify table "test_table_2" because there was a parallel operation on a distributed tablein the transaction
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ALTER TABLE test_table_2 DROP CONSTRAINT test_table_2_value_1_fkey;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
@ -1770,7 +1770,6 @@ SELECT * FROM test_table_1;
DROP TABLE test_table_1, test_table_2;
-- check if we successfuly set multi_shard_modify_mode to sequential after sequentially running DDLs
-- in transaction since the upcoming DDLs need to run sequentially.
-- FIXME: fails for now
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
CREATE TABLE test_table_3(id int PRIMARY KEY, value_1 int);
@ -1795,11 +1794,9 @@ SELECT create_distributed_table('test_table_3', 'id');
BEGIN;
ALTER TABLE test_table_2 ADD CONSTRAINT fkey FOREIGN KEY (value_1) REFERENCES test_table_1(id);
ALTER TABLE test_table_3 ADD COLUMN test_column int;
ERROR: cannot establish a new connection for placement 7000556, since DDL has been executed on a connection that is in use
ALTER TABLE test_table_1 DROP COLUMN id CASCADE;
ERROR: current transaction is aborted, commands ignored until end of transaction block
NOTICE: drop cascades to constraint fkey on table test_table_2
ALTER TABLE test_table_1 ADD COLUMN id int;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
DROP TABLE test_table_1, test_table_2, test_table_3;
-- NOTE: Postgres does not support foreign keys on partitioned tables currently.

View File

@ -69,7 +69,7 @@ test: multi_null_minmax_value_pruning
test: multi_query_directory_cleanup
test: multi_task_assignment_policy multi_cross_shard
test: multi_utility_statements
test: multi_dropped_column_aliases
test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: multi_binary_master_copy_format
# ----------

View File

@ -0,0 +1,577 @@
--
-- Tests multiple commands in transactions where
-- there is foreign key relation between reference
-- tables and distributed tables
--
CREATE SCHEMA test_fkey_to_ref_in_tx;
SET search_path TO 'test_fkey_to_ref_in_tx';
SET citus.next_shard_id TO 2380000;
SET citus.next_placement_id TO 2380000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE referece_table(id int PRIMARY KEY);
SELECT create_reference_table('referece_table');
CREATE TABLE on_update_fkey_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('on_update_fkey_table', 'id');
CREATE TABLE unrelated_dist_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('unrelated_dist_table', 'id');
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referece_table(id) ON UPDATE CASCADE;
INSERT INTO referece_table SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO unrelated_dist_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- in order to see when the mode automatically swithces to sequential execution
SET client_min_messages TO DEBUG1;
-- case 1.1: SELECT to a reference table is followed by a parallel SELECT to a distributed table
BEGIN;
SELECT count(*) FROM referece_table;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
-- case 1.2: SELECT to a reference table is followed by a multiple router SELECTs to a distributed table
BEGIN;
SELECT count(*) FROM referece_table;
SELECT count(*) FROM on_update_fkey_table WHERE id = 15;
SELECT count(*) FROM on_update_fkey_table WHERE id = 16;
SELECT count(*) FROM on_update_fkey_table WHERE id = 17;
SELECT count(*) FROM on_update_fkey_table WHERE id = 18;
ROLLBACK;
-- case 1.3: SELECT to a reference table is followed by a multi-shard UPDATE to a distributed table
BEGIN;
SELECT count(*) FROM referece_table;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ROLLBACK;
-- case 1.4: SELECT to a reference table is followed by a multiple sing-shard UPDATE to a distributed table
BEGIN;
SELECT count(*) FROM referece_table;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE id = 15;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE id = 16;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE id = 17;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE id = 18;
ROLLBACK;
-- case 1.5: SELECT to a reference table is followed by a DDL that touches fkey column
BEGIN;
SELECT count(*) FROM referece_table;
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE bigint;
ROLLBACK;
-- case 1.6: SELECT to a reference table is followed by an unrelated DDL
BEGIN;
SELECT count(*) FROM referece_table;
ALTER TABLE on_update_fkey_table ADD COLUMN X INT;
ROLLBACK;
-- case 1.7.1: SELECT to a reference table is followed by a DDL that is on
-- the foreign key column
BEGIN;
SELECT count(*) FROM referece_table;
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
ALTER TABLE on_update_fkey_table DROP COLUMN value_1 CASCADE;
ROLLBACK;
-- case 1.7.2: SELECT to a reference table is followed by a DDL that is on
-- the foreign key column after a parallel query has been executed
BEGIN;
SELECT count(*) FROM unrelated_dist_table;
SELECT count(*) FROM referece_table;
ALTER TABLE on_update_fkey_table DROP COLUMN value_1 CASCADE;
ROLLBACK;
-- case 1.7.3: SELECT to a reference table is followed by a DDL that is not on
-- the foreign key column, and a parallel query has already been executed
BEGIN;
SELECT count(*) FROM unrelated_dist_table;
SELECT count(*) FROM referece_table;
ALTER TABLE on_update_fkey_table ADD COLUMN X INT;
ROLLBACK;
-- case 1.8: SELECT to a reference table is followed by a COPY
BEGIN;
SELECT count(*) FROM referece_table;
COPY on_update_fkey_table FROM STDIN WITH CSV;
1001,99
1002,99
1003,99
1004,99
1005,99
\.
ROLLBACK;
-- case 2.1: UPDATE to a reference table is followed by a multi-shard SELECT
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101;
ROLLBACK;
-- case 2.2: UPDATE to a reference table is followed by multiple router SELECT
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101 AND id = 99;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101 AND id = 199;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101 AND id = 299;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101 AND id = 399;
ROLLBACK;
-- case 2.3: UPDATE to a reference table is followed by a multi-shard UPDATE
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
UPDATE on_update_fkey_table SET value_1 = 15;
ROLLBACK;
-- case 2.4: UPDATE to a reference table is followed by multiple router UPDATEs
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 1;
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 2;
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 3;
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 4;
ROLLBACK;
-- case 2.5: UPDATE to a reference table is followed by a DDL that touches fkey column
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE bigint;
ROLLBACK;
-- case 2.6: UPDATE to a reference table is followed by an unrelated DDL
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
ALTER TABLE on_update_fkey_table ADD COLUMN value_1_X INT;
ROLLBACK;
-- case 2.7: UPDATE to a reference table is followed by COPY
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
COPY on_update_fkey_table FROM STDIN WITH CSV;
1001,99
1002,99
1003,99
1004,99
1005,99
\.
ROLLBACK;
-- case 2.8: UPDATE to a reference table is followed by TRUNCATE
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- case 3.1: an unrelated DDL to a reference table is followed by a real-time SELECT
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DEFAULT 1001;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
-- case 3.2: DDL that touches fkey column to a reference table is followed by a real-time SELECT
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DATA TYPE int;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
-- case 3.3: DDL to a reference table followed by a multi shard UPDATE
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DEFAULT 1001;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
ROLLBACK;
-- case 3.4: DDL to a reference table followed by multiple router UPDATEs
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DEFAULT 1001;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 1;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 2;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 3;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 4;
ROLLBACK;
-- case 3.5: DDL to reference table followed by a DDL to dist table
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DATA TYPE smallint;
CREATE INDEX fkey_test_index_1 ON on_update_fkey_table(value_1);
ROLLBACK;
-- case 4.6: DDL to reference table followed by a DDL to dist table, both touching fkey columns
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DATA TYPE smallint;
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE smallint;
ROLLBACK;
-- case 3.7: DDL to a reference table is followed by COPY
BEGIN;
ALTER TABLE referece_table ADD COLUMN X int;
COPY on_update_fkey_table FROM STDIN WITH CSV;
1001,99
1002,99
1003,99
1004,99
1005,99
\.
ROLLBACK;
-- case 3.8: DDL to a reference table is followed by TRUNCATE
BEGIN;
ALTER TABLE referece_table ADD COLUMN X int;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- case 3.9: DDL to a reference table is followed by TRUNCATE
BEGIN;
ALTER TABLE referece_table ALTER COLUMN id SET DATA TYPE smallint;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-----
--- Now, start testing the other way araound
-----
-- case 4.1: SELECT to a dist table is follwed by a SELECT to a reference table
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
SELECT count(*) FROM referece_table;
ROLLBACK;
-- case 4.2: SELECT to a dist table is follwed by a DML to a reference table
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
UPDATE referece_table SET id = 101 WHERE id = 99;
ROLLBACK;
-- case 4.3: SELECT to a dist table is follwed by an unrelated DDL to a reference table
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
ALTER TABLE referece_table ADD COLUMN X INT;
ROLLBACK;
-- case 4.4: SELECT to a dist table is follwed by a DDL to a reference table
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
ALTER TABLE referece_table ALTER COLUMN id SET DATA TYPE smallint;
ROLLBACK;
-- case 4.5: SELECT to a dist table is follwed by a TRUNCATE
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
TRUNCATE referece_table CASCADE;
ROLLBACK;
-- case 4.6: Router SELECT to a dist table is followed by a TRUNCATE
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE id = 9;
TRUNCATE referece_table CASCADE;
ROLLBACK;
-- case 5.1: Parallel UPDATE on distributed table follow by a SELECT
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
SELECT count(*) FROM referece_table;
ROLLBACK;
-- case 5.2: Parallel UPDATE on distributed table follow by a UPDATE
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
UPDATE referece_table SET id = 160 WHERE id = 15;
ROLLBACK;
-- case 5.3: Parallel UPDATE on distributed table follow by an unrelated DDL on reference table
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ALTER TABLE referece_table ADD COLUMN X INT;
ROLLBACK;
-- case 5.4: Parallel UPDATE on distributed table follow by a related DDL on reference table
-- FIXME: Can we do better?
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ALTER TABLE referece_table ALTER COLUMN id SET DATA TYPE smallint;
ROLLBACK;
-- case 6:1: Unrelated parallel DDL on distributed table followed by SELECT on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
SELECT count(*) FROM referece_table;
ROLLBACK;
-- case 6:2: Related parallel DDL on distributed table followed by SELECT on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE smallint;
UPDATE referece_table SET id = 160 WHERE id = 15;
ROLLBACK;
-- case 6:3: Unrelated parallel DDL on distributed table followed by UPDATE on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
SELECT count(*) FROM referece_table;
ROLLBACK;
-- case 6:4: Related parallel DDL on distributed table followed by SELECT on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
UPDATE referece_table SET id = 160 WHERE id = 15;
ROLLBACK;
-- case 6:5: Unrelated parallel DDL on distributed table followed by unrelated DDL on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
ALTER TABLE referece_table ADD COLUMN X int;
ROLLBACK;
-- case 6:6: Unrelated parallel DDL on distributed table followed by related DDL on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE smallint;
ROLLBACK;
-- some more extensive tests
-- UPDATE on dist table is followed by DELETE to reference table
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
DELETE FROM referece_table WHERE id = 99;
ROLLBACK;
-- an unrelated update followed by update on dist table and update
-- on reference table
BEGIN;
UPDATE unrelated_dist_table SET value_1 = 15;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
UPDATE referece_table SET id = 101 WHERE id = 99;
ROLLBACK;
-- an unrelated update followed by update on the reference table and update
-- on the cascading distributed table
-- note that the UPDATE on the reference table will try to set the execution
-- mode to sequential, which will fail since there is an already opened
-- parallel connections
BEGIN;
UPDATE unrelated_dist_table SET value_1 = 15;
UPDATE referece_table SET id = 101 WHERE id = 99;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
ROLLBACK;
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
SELECT create_distributed_table('test_table_2', 'id');
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1 CASCADE;
ROLLBACK;
-- the fails since we're trying to switch sequential mode after
-- already executed a parallel query
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
CREATE TABLE tt4(id int PRIMARY KEY, value_1 int, FOREIGN KEY(id) REFERENCES tt4(id));
SELECT create_distributed_table('tt4', 'id');
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id), FOREIGN KEY(id) REFERENCES tt4(id));
SELECT create_distributed_table('test_table_2', 'id');
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1 CASCADE;
ROLLBACK;
-- same test with the above, but this time using
-- sequential mode, succeeds
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
CREATE TABLE tt4(id int PRIMARY KEY, value_1 int, FOREIGN KEY(id) REFERENCES tt4(id));
SELECT create_distributed_table('tt4', 'id');
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id), FOREIGN KEY(id) REFERENCES tt4(id));
SELECT create_distributed_table('test_table_2', 'id');
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1 CASCADE;
ROLLBACK;
-- another test with ALTER TABLE fails since we're already opened
-- parallel connection via create_distributed_table(), later
-- adding foreign key to reference table fails
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_2', 'id');
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1, test_table_2;
COMMIT;
-- same test with the above on sequential mode should work fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_2', 'id');
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1, test_table_2;
COMMIT;
-- similar test with the above, but this time the order of
-- create_distributed_table and create_reference_table is
-- changed
BEGIN;
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_2', 'id');
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1 CASCADE;
ROLLBACK;
-- same test in sequential mode should succeed
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_2', 'id');
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1 CASCADE;
ROLLBACK;
-- again a very similar test, but this time
-- a parallel SELECT is already executed before
-- setting the mode to sequential should fail
BEGIN;
SELECT count(*) FROM on_update_fkey_table;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_2', 'id');
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
ALTER TABLE test_table_2 ADD CONSTRAINT c_check FOREIGN KEY (value_1) REFERENCES test_table_1(id);
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_1 CASCADE;
ROLLBACK;
-- make sure that we cannot create hash distributed tables with
-- foreign keys to reference tables when they have data in it
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
INSERT INTO test_table_1 SELECT i FROM generate_series(0,100) i;
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
INSERT INTO test_table_2 SELECT i, i FROM generate_series(0,100) i;
SELECT create_reference_table('test_table_1');
SELECT create_distributed_table('test_table_2', 'id');
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_2, test_table_1;
COMMIT;
-- the same test with above in sequential mode would still not work
-- since COPY cannot be executed in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE TABLE test_table_1(id int PRIMARY KEY);
INSERT INTO test_table_1 SELECT i FROM generate_series(0,100) i;
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
INSERT INTO test_table_2 SELECT i, i FROM generate_series(0,100) i;
SELECT create_reference_table('test_table_1');
SELECT create_distributed_table('test_table_2', 'id');
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_2, test_table_1;
COMMIT;
-- we should be able to execute and DML/DDL/SELECT after we've
-- switched to sequential via create_distributed_table
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
SELECT create_reference_table('test_table_1');
SELECT create_distributed_table('test_table_2', 'id');
-- and maybe some other test
CREATE INDEX i1 ON test_table_1(id);
ALTER TABLE test_table_2 ADD CONSTRAINT check_val CHECK (id > 0);
SELECT count(*) FROM test_table_2;
SELECT count(*) FROM test_table_1;
UPDATE test_table_2 SET value_1 = 15;
-- make sure that the output isn't too verbose
SET LOCAL client_min_messages TO ERROR;
DROP TABLE test_table_2, test_table_1;
COMMIT;
RESET client_min_messages;
DROP SCHEMA test_fkey_to_ref_in_tx CASCADE;
SET search_path TO public;

View File

@ -623,7 +623,6 @@ BEGIN;
ROLLBACK;
-- create_reference_table, create_distributed_table and ALTER TABLE in the same transaction
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
SELECT create_reference_table('test_table_1');
@ -637,7 +636,6 @@ BEGIN;
COMMIT;
-- the order of create_reference_table and create_distributed_table is changed
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_1', 'id');
@ -665,7 +663,6 @@ BEGIN;
COMMIT;
-- make sure that other DDLs/DMLs also work fine
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int, FOREIGN KEY(value_1) REFERENCES test_table_1(id));
@ -694,7 +691,6 @@ SELECT count(*) FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_reference_tab
DROP TABLE test_table_1, test_table_2;
-- check if we can drop the foreign constraint in a transaction right after ADD CONSTRAINT
-- FIXME: fails for now
BEGIN;
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
@ -894,7 +890,6 @@ DROP TABLE test_table_1, test_table_2;
-- check if we successfuly set multi_shard_modify_mode to sequential after sequentially running DDLs
-- in transaction since the upcoming DDLs need to run sequentially.
-- FIXME: fails for now
CREATE TABLE test_table_1(id int PRIMARY KEY);
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
CREATE TABLE test_table_3(id int PRIMARY KEY, value_1 int);