mirror of https://github.com/citusdata/citus.git
Merge pull request #1455 from citusdata/join_xact_modification_levels
Rework connection API to remove transaction restrictionspull/1462/head
commit
04e2d64764
|
@ -14,7 +14,9 @@
|
||||||
#include "access/hash.h"
|
#include "access/hash.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
@ -47,17 +49,14 @@ typedef struct ConnectionReference
|
||||||
bool hadDML;
|
bool hadDML;
|
||||||
bool hadDDL;
|
bool hadDDL;
|
||||||
|
|
||||||
/*
|
|
||||||
* Membership in ConnectionPlacementHashEntry->connectionReferences or
|
|
||||||
* ColocatedPlacementsHashEntry->connectionReferences.
|
|
||||||
*/
|
|
||||||
dlist_node placementNode;
|
|
||||||
|
|
||||||
/* membership in MultiConnection->referencedPlacements */
|
/* membership in MultiConnection->referencedPlacements */
|
||||||
dlist_node connectionNode;
|
dlist_node connectionNode;
|
||||||
} ConnectionReference;
|
} ConnectionReference;
|
||||||
|
|
||||||
|
|
||||||
|
struct ColocatedPlacementsHashEntry;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Hash table mapping placements to a list of connections.
|
* Hash table mapping placements to a list of connections.
|
||||||
*
|
*
|
||||||
|
@ -85,11 +84,14 @@ typedef struct ConnectionPlacementHashEntry
|
||||||
/* did any remote transactions fail? */
|
/* did any remote transactions fail? */
|
||||||
bool failed;
|
bool failed;
|
||||||
|
|
||||||
/* list of connections to remote nodes */
|
/* primary connection used to access the placement */
|
||||||
dlist_head connectionReferences;
|
ConnectionReference *primaryConnection;
|
||||||
|
|
||||||
/* if non-NULL, connection executing DML/DDL*/
|
/* are any other connections reading from the placements? */
|
||||||
ConnectionReference *modifyingConnection;
|
bool hasSecondaryConnections;
|
||||||
|
|
||||||
|
/* entry for the set of co-located placements */
|
||||||
|
struct ColocatedPlacementsHashEntry *colocatedEntry;
|
||||||
|
|
||||||
/* membership in ConnectionShardHashEntry->placementConnections */
|
/* membership in ConnectionShardHashEntry->placementConnections */
|
||||||
dlist_node shardNode;
|
dlist_node shardNode;
|
||||||
|
@ -137,11 +139,11 @@ typedef struct ColocatedPlacementsHashEntry
|
||||||
{
|
{
|
||||||
ColocatedPlacementsHashKey key;
|
ColocatedPlacementsHashKey key;
|
||||||
|
|
||||||
/* list of connections to remote nodes */
|
/* primary connection used to access the co-located placements */
|
||||||
dlist_head connectionReferences;
|
ConnectionReference *primaryConnection;
|
||||||
|
|
||||||
/* if non-NULL, connection executing DML/DDL*/
|
/* are any other connections reading from the placements? */
|
||||||
ConnectionReference *modifyingConnection;
|
bool hasSecondaryConnections;
|
||||||
} ColocatedPlacementsHashEntry;
|
} ColocatedPlacementsHashEntry;
|
||||||
|
|
||||||
static HTAB *ColocatedPlacementsHash;
|
static HTAB *ColocatedPlacementsHash;
|
||||||
|
@ -172,19 +174,10 @@ typedef struct ConnectionShardHashEntry
|
||||||
static HTAB *ConnectionShardHash;
|
static HTAB *ConnectionShardHash;
|
||||||
|
|
||||||
|
|
||||||
static MultiConnection * StartColocatedPlacementConnection(uint32 flags,
|
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
|
||||||
ShardPlacement *placement,
|
ShardPlacement *placement);
|
||||||
const char *userName);
|
|
||||||
static ConnectionReference * CheckExistingPlacementConnections(uint32 flags,
|
|
||||||
ConnectionPlacementHashEntry
|
|
||||||
*placementEntry,
|
|
||||||
const char *userName);
|
|
||||||
static ConnectionReference * CheckExistingColocatedConnections(uint32 flags,
|
|
||||||
ColocatedPlacementsHashEntry
|
|
||||||
*connectionEntry,
|
|
||||||
const char *userName);
|
|
||||||
static bool CanUseExistingConnection(uint32 flags, const char *userName,
|
static bool CanUseExistingConnection(uint32 flags, const char *userName,
|
||||||
ConnectionReference *connectionReference);
|
ConnectionReference *placementConnection);
|
||||||
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
|
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
|
||||||
ShardPlacement *placement);
|
ShardPlacement *placement);
|
||||||
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
|
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
|
||||||
|
@ -229,76 +222,355 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user
|
||||||
MultiConnection *
|
MultiConnection *
|
||||||
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
|
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
|
||||||
{
|
{
|
||||||
ConnectionPlacementHashKey key;
|
ShardPlacementAccess *placementAccess =
|
||||||
ConnectionPlacementHashEntry *placementEntry = NULL;
|
(ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
|
||||||
ConnectionReference *returnConnectionReference = NULL;
|
|
||||||
|
placementAccess->placement = placement;
|
||||||
|
|
||||||
|
if (flags & FOR_DDL)
|
||||||
|
{
|
||||||
|
placementAccess->accessType = PLACEMENT_ACCESS_DDL;
|
||||||
|
}
|
||||||
|
else if (flags & FOR_DML)
|
||||||
|
{
|
||||||
|
placementAccess->accessType = PLACEMENT_ACCESS_DML;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
placementAccess->accessType = PLACEMENT_ACCESS_SELECT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return StartPlacementListConnection(flags, list_make1(placementAccess), userName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetPlacementListConnection establishes a connection for a set of placement
|
||||||
|
* accesses.
|
||||||
|
*
|
||||||
|
* See StartPlacementListConnection for details.
|
||||||
|
*/
|
||||||
|
MultiConnection *
|
||||||
|
GetPlacementListConnection(uint32 flags, List *placementAccessList, const char *userName)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = StartPlacementListConnection(flags, placementAccessList,
|
||||||
|
userName);
|
||||||
|
|
||||||
|
FinishConnectionEstablishment(connection);
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StartPlacementListConnection returns a connection to a remote node suitable for
|
||||||
|
* a placement accesses (SELECT, DML, DDL) or throws an error if no suitable
|
||||||
|
* connection can be established if would cause a self-deadlock or consistency
|
||||||
|
* violation.
|
||||||
|
*/
|
||||||
|
MultiConnection *
|
||||||
|
StartPlacementListConnection(uint32 flags, List *placementAccessList,
|
||||||
|
const char *userName)
|
||||||
|
{
|
||||||
char *freeUserName = NULL;
|
char *freeUserName = NULL;
|
||||||
bool found = false;
|
bool foundModifyingConnection = false;
|
||||||
|
ListCell *placementAccessCell = NULL;
|
||||||
|
List *placementEntryList = NIL;
|
||||||
|
ListCell *placementEntryCell = NULL;
|
||||||
|
MultiConnection *chosenConnection = NULL;
|
||||||
|
|
||||||
if (userName == NULL)
|
if (userName == NULL)
|
||||||
{
|
{
|
||||||
userName = freeUserName = CurrentUserName();
|
userName = freeUserName = CurrentUserName();
|
||||||
}
|
}
|
||||||
|
|
||||||
key.placementId = placement->placementId;
|
/*
|
||||||
|
* Go through all placement accesses to find a suitable connection.
|
||||||
/* lookup relevant hash entry */
|
*
|
||||||
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
|
* If none of the placements have been accessed in this transaction, connection
|
||||||
if (!found)
|
* remains NULL.
|
||||||
|
*
|
||||||
|
* If one or more of the placements have been modified in this transaction, then
|
||||||
|
* use the connection that performed the write. If placements have been written
|
||||||
|
* over multiple connections or the connection is not available, error out.
|
||||||
|
*
|
||||||
|
* If placements have only been read in this transaction, then use the last
|
||||||
|
* suitable connection found for a placement in the placementAccessList.
|
||||||
|
*/
|
||||||
|
foreach(placementAccessCell, placementAccessList)
|
||||||
{
|
{
|
||||||
dlist_init(&placementEntry->connectionReferences);
|
ShardPlacementAccess *placementAccess =
|
||||||
placementEntry->failed = false;
|
(ShardPlacementAccess *) lfirst(placementAccessCell);
|
||||||
placementEntry->modifyingConnection = NULL;
|
ShardPlacement *placement = placementAccess->placement;
|
||||||
|
ShardPlacementAccessType accessType = placementAccess->accessType;
|
||||||
|
|
||||||
|
ConnectionPlacementHashEntry *placementEntry = NULL;
|
||||||
|
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
||||||
|
ConnectionReference *placementConnection = NULL;
|
||||||
|
|
||||||
|
if (placement->shardId == INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When a SELECT prunes down to 0 shard, we use a dummy placement.
|
||||||
|
* In that case, we can fall back to the default connection.
|
||||||
|
*
|
||||||
|
* FIXME: this can be removed if we evaluate empty SELECTs locally.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
placementEntry = FindOrCreatePlacementEntry(placement);
|
||||||
|
colocatedEntry = placementEntry->colocatedEntry;
|
||||||
|
placementConnection = placementEntry->primaryConnection;
|
||||||
|
|
||||||
|
/* note: the Asserts below are primarily for clarifying the conditions */
|
||||||
|
|
||||||
|
if (placementConnection->connection == NULL)
|
||||||
|
{
|
||||||
|
/* no connection has been chosen for the placement */
|
||||||
|
}
|
||||||
|
else if (accessType == PLACEMENT_ACCESS_DDL &&
|
||||||
|
placementEntry->hasSecondaryConnections)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If a placement has been read over multiple connections (typically as
|
||||||
|
* a result of a reference table join) then a DDL command on the placement
|
||||||
|
* would create a self-deadlock.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg(
|
||||||
|
"cannot perform DDL on placement %ld, which has been read over "
|
||||||
|
"multiple connections",
|
||||||
|
placement->placementId)));
|
||||||
|
}
|
||||||
|
else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL &&
|
||||||
|
colocatedEntry->hasSecondaryConnections)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If a placement has been read over multiple (uncommitted) connections
|
||||||
|
* then a DDL command on a co-located placement may create a self-deadlock
|
||||||
|
* if there exist some relationship between the co-located placements
|
||||||
|
* (e.g. foreign key, partitioning).
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot perform DDL on placement %ld since a co-located "
|
||||||
|
"placement has been read over multiple connections",
|
||||||
|
placement->placementId)));
|
||||||
|
}
|
||||||
|
else if (foundModifyingConnection)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We already found a connection that performed writes on of the placements
|
||||||
|
* and must use it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if ((placementConnection->hadDDL || placementConnection->hadDML) &&
|
||||||
|
placementConnection->connection != chosenConnection)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The current placement may have been modified over a different
|
||||||
|
* connection. Neither connection is guaranteed to see all uncomitted
|
||||||
|
* writes and therefore we cannot proceed.
|
||||||
|
*/
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot perform query with placements that were "
|
||||||
|
"modified over multiple connections")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (CanUseExistingConnection(flags, userName, placementConnection))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There is an existing connection for the placement and we can use it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
|
||||||
|
chosenConnection = placementConnection->connection;
|
||||||
|
|
||||||
|
if (placementConnection->hadDDL || placementConnection->hadDML)
|
||||||
|
{
|
||||||
|
/* this connection performed writes, we must use it */
|
||||||
|
foundModifyingConnection = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (placementConnection->hadDDL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There is an existing connection, but we cannot use it and it executed
|
||||||
|
* DDL. Any subsequent operation needs to be able to see the results of
|
||||||
|
* the DDL command and thus cannot proceed if it cannot use the connection.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||||
|
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot establish a new connection for placement %ld, since "
|
||||||
|
"DDL has been executed on a connection that is in use",
|
||||||
|
placement->placementId)));
|
||||||
|
}
|
||||||
|
else if (placementConnection->hadDML)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There is an existing connection, but we cannot use it and it executed
|
||||||
|
* DML. Any subsequent operation needs to be able to see the results of
|
||||||
|
* the DML command and thus cannot proceed if it cannot use the connection.
|
||||||
|
*
|
||||||
|
* Note that this is not meaningfully different from the previous case. We
|
||||||
|
* just produce a different error message based on whether DDL was or only
|
||||||
|
* DML was executed.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||||
|
Assert(!placementConnection->hadDDL);
|
||||||
|
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot establish a new connection for placement %ld, since "
|
||||||
|
"DML has been executed on a connection that is in use",
|
||||||
|
placement->placementId)));
|
||||||
|
}
|
||||||
|
else if (accessType == PLACEMENT_ACCESS_DDL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There is an existing connection, but we cannot use it and we want to
|
||||||
|
* execute DDL. The operation on the existing connection might conflict
|
||||||
|
* with the DDL statement.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||||
|
Assert(!placementConnection->hadDDL);
|
||||||
|
Assert(!placementConnection->hadDML);
|
||||||
|
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot perform a parallel DDL command because multiple "
|
||||||
|
"placements have been accessed over the same connection")));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The placement has a connection assigned to it, but it cannot be used,
|
||||||
|
* most likely because it has been claimed exclusively. Fortunately, it
|
||||||
|
* has only been used for reads and we're not performing a DDL command.
|
||||||
|
* We can therefore use a different connection for this placement.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Assert(placementConnection != NULL);
|
||||||
|
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||||
|
Assert(!placementConnection->hadDDL);
|
||||||
|
Assert(!placementConnection->hadDML);
|
||||||
|
Assert(accessType != PLACEMENT_ACCESS_DDL);
|
||||||
|
}
|
||||||
|
|
||||||
|
placementEntryList = lappend(placementEntryList, placementEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (chosenConnection == NULL)
|
||||||
|
{
|
||||||
|
/* use the first placement from the list to extract nodename and nodeport */
|
||||||
|
ShardPlacementAccess *placementAccess =
|
||||||
|
(ShardPlacementAccess *) linitial(placementAccessList);
|
||||||
|
ShardPlacement *placement = placementAccess->placement;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* No suitable connection in the placement->connection mapping, get one from
|
||||||
|
* the node->connection pool.
|
||||||
|
*/
|
||||||
|
chosenConnection = StartNodeConnection(flags, placement->nodeName,
|
||||||
|
placement->nodePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check whether any of the connections already associated with the
|
* Now that a connection has been chosen, initialise or update the connection
|
||||||
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
|
* references for all placements.
|
||||||
*/
|
*/
|
||||||
returnConnectionReference =
|
forboth(placementAccessCell, placementAccessList,
|
||||||
CheckExistingPlacementConnections(flags, placementEntry, userName);
|
placementEntryCell, placementEntryList)
|
||||||
|
|
||||||
/*
|
|
||||||
* Either no caching desired, or no connection already associated with the
|
|
||||||
* placement present. Check whether there's a usable connection associated
|
|
||||||
* for the set of colocated placements, or establish a new one.
|
|
||||||
*
|
|
||||||
* Allocations are performed in transaction context, so we don't have to
|
|
||||||
* care about freeing in case of an early disconnect.
|
|
||||||
*/
|
|
||||||
if (returnConnectionReference == NULL)
|
|
||||||
{
|
{
|
||||||
MultiConnection *connection =
|
ShardPlacementAccess *placementAccess =
|
||||||
StartColocatedPlacementConnection(flags, placement, userName);
|
(ShardPlacementAccess *) lfirst(placementAccessCell);
|
||||||
|
ShardPlacementAccessType accessType = placementAccess->accessType;
|
||||||
|
ConnectionPlacementHashEntry *placementEntry =
|
||||||
|
(ConnectionPlacementHashEntry *) lfirst(placementEntryCell);
|
||||||
|
ConnectionReference *placementConnection = placementEntry->primaryConnection;
|
||||||
|
|
||||||
returnConnectionReference = (ConnectionReference *)
|
if (placementConnection->connection == chosenConnection)
|
||||||
MemoryContextAlloc(TopTransactionContext,
|
{
|
||||||
sizeof(ConnectionReference));
|
/* using the connection that was already assigned to the placement */
|
||||||
returnConnectionReference->connection = connection;
|
}
|
||||||
returnConnectionReference->hadDDL = false;
|
else if (placementConnection->connection == NULL)
|
||||||
returnConnectionReference->hadDML = false;
|
{
|
||||||
returnConnectionReference->userName =
|
/* placement does not have a connection assigned yet */
|
||||||
MemoryContextStrdup(TopTransactionContext, userName);
|
placementConnection->connection = chosenConnection;
|
||||||
dlist_push_tail(&placementEntry->connectionReferences,
|
placementConnection->hadDDL = false;
|
||||||
&returnConnectionReference->placementNode);
|
placementConnection->hadDML = false;
|
||||||
|
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
|
||||||
/* record association with shard, for invalidation */
|
userName);
|
||||||
AssociatePlacementWithShard(placementEntry, placement);
|
|
||||||
|
|
||||||
/* record association with connection, to handle connection closure */
|
/* record association with connection, to handle connection closure */
|
||||||
dlist_push_tail(&connection->referencedPlacements,
|
dlist_push_tail(&chosenConnection->referencedPlacements,
|
||||||
&returnConnectionReference->connectionNode);
|
&placementConnection->connectionNode);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* using a different connection than the one assigned to the placement */
|
||||||
|
|
||||||
|
if (accessType != PLACEMENT_ACCESS_SELECT)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We previously read from the placement, but now we're writing to
|
||||||
|
* it (if we had written to the placement, we would have either chosen
|
||||||
|
* the same connection, or errored out). Update the connection reference
|
||||||
|
* to point to the connection used for writing. We don't need to remember
|
||||||
|
* the existing connection since we won't be able to reuse it for
|
||||||
|
* accessing the placement. However, we do register that it exists in
|
||||||
|
* hasSecondaryConnections.
|
||||||
|
*/
|
||||||
|
placementConnection->connection = chosenConnection;
|
||||||
|
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
|
||||||
|
userName);
|
||||||
|
|
||||||
|
Assert(!placementConnection->hadDDL);
|
||||||
|
Assert(!placementConnection->hadDML);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (flags & FOR_DDL)
|
/*
|
||||||
|
* There are now multiple connections that read from the placement
|
||||||
|
* and DDL commands are forbidden.
|
||||||
|
*/
|
||||||
|
placementEntry->hasSecondaryConnections = true;
|
||||||
|
|
||||||
|
if (placementEntry->colocatedEntry != NULL)
|
||||||
{
|
{
|
||||||
placementEntry->modifyingConnection = returnConnectionReference;
|
/* we also remember this for co-located placements */
|
||||||
returnConnectionReference->hadDDL = true;
|
placementEntry->colocatedEntry->hasSecondaryConnections = true;
|
||||||
}
|
}
|
||||||
if (flags & FOR_DML)
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remember that we used the current connection for writes.
|
||||||
|
*/
|
||||||
|
if (accessType == PLACEMENT_ACCESS_DDL)
|
||||||
{
|
{
|
||||||
placementEntry->modifyingConnection = returnConnectionReference;
|
placementConnection->hadDDL = true;
|
||||||
returnConnectionReference->hadDML = true;
|
}
|
||||||
|
|
||||||
|
if (accessType == PLACEMENT_ACCESS_DML)
|
||||||
|
{
|
||||||
|
placementConnection->hadDML = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (freeUserName)
|
if (freeUserName)
|
||||||
|
@ -306,195 +578,82 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
|
||||||
pfree(freeUserName);
|
pfree(freeUserName);
|
||||||
}
|
}
|
||||||
|
|
||||||
return returnConnectionReference->connection;
|
return chosenConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckExistingPlacementConnections check whether any of the existing
|
* FindOrCreatePlacementEntry finds a placement entry in either the
|
||||||
* connections is usable. If so, return it, otherwise return NULL.
|
* placement->connection hash or the co-located placements->connection hash,
|
||||||
*
|
* or adds a new entry if the placement has not yet been accessed in the
|
||||||
* A connection is usable if it is not in use, the user matches, DDL/DML usage
|
* current transaction.
|
||||||
* match and cached connection are allowed. If no existing connection
|
|
||||||
* matches, but a new connection would conflict (e.g. a connection already
|
|
||||||
* exists but isn't usable, and the desired connection needs to execute
|
|
||||||
* DML/DML) an error is thrown.
|
|
||||||
*/
|
*/
|
||||||
static ConnectionReference *
|
static ConnectionPlacementHashEntry *
|
||||||
CheckExistingPlacementConnections(uint32 flags,
|
FindOrCreatePlacementEntry(ShardPlacement *placement)
|
||||||
ConnectionPlacementHashEntry *placementEntry,
|
|
||||||
const char *userName)
|
|
||||||
{
|
{
|
||||||
dlist_iter it;
|
ConnectionPlacementHashKey key;
|
||||||
|
ConnectionPlacementHashEntry *placementEntry = NULL;
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
|
key.placementId = placement->placementId;
|
||||||
|
|
||||||
|
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
/* no connection has been chosen for this placement */
|
||||||
|
placementEntry->failed = false;
|
||||||
|
placementEntry->primaryConnection = NULL;
|
||||||
|
placementEntry->hasSecondaryConnections = false;
|
||||||
|
placementEntry->colocatedEntry = NULL;
|
||||||
|
|
||||||
|
if (placement->partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
|
placement->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
ColocatedPlacementsHashKey key;
|
||||||
|
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
||||||
|
|
||||||
|
strcpy(key.nodeName, placement->nodeName);
|
||||||
|
key.nodePort = placement->nodePort;
|
||||||
|
key.colocationGroupId = placement->colocationGroupId;
|
||||||
|
key.representativeValue = placement->representativeValue;
|
||||||
|
|
||||||
|
/* look for a connection assigned to co-located placements */
|
||||||
|
colocatedEntry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER,
|
||||||
|
&found);
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
void *conRef = MemoryContextAllocZero(TopTransactionContext,
|
||||||
|
sizeof(ConnectionReference));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If there's a connection that has executed DML/DDL, always return it if
|
* Create a connection reference that can be used for the entire
|
||||||
* possible. That's because we only can execute DML/DDL over that
|
* set of co-located placements.
|
||||||
* connection. Checking this first also allows us to detect conflicts due
|
|
||||||
* to opening a second connection that wants to execute DML/DDL.
|
|
||||||
*/
|
*/
|
||||||
if (placementEntry->modifyingConnection)
|
colocatedEntry->primaryConnection = (ConnectionReference *) conRef;
|
||||||
{
|
|
||||||
ConnectionReference *connectionReference = placementEntry->modifyingConnection;
|
|
||||||
|
|
||||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
colocatedEntry->hasSecondaryConnections = false;
|
||||||
{
|
|
||||||
return connectionReference;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connectionReference->hadDDL)
|
/*
|
||||||
{
|
* Assign the connection reference for the set of co-located placements
|
||||||
/* would deadlock otherwise */
|
* to the current placement.
|
||||||
ereport(ERROR,
|
*/
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
placementEntry->primaryConnection = colocatedEntry->primaryConnection;
|
||||||
errmsg("cannot establish new placement connection when DDL has been "
|
placementEntry->colocatedEntry = colocatedEntry;
|
||||||
"executed on existing placement connection")));
|
|
||||||
}
|
|
||||||
else if (connectionReference->hadDML)
|
|
||||||
{
|
|
||||||
/* we'd not see the other connection's contents */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection when DML has been "
|
|
||||||
"executed on existing placement connection")));
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* modifying xact should have executed DML/DDL */
|
void *conRef = MemoryContextAllocZero(TopTransactionContext,
|
||||||
Assert(false);
|
sizeof(ConnectionReference));
|
||||||
|
|
||||||
|
placementEntry->primaryConnection = (ConnectionReference *) conRef;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/* record association with shard, for invalidation */
|
||||||
* Search existing connections for a reusable connection.
|
AssociatePlacementWithShard(placementEntry, placement);
|
||||||
*/
|
|
||||||
dlist_foreach(it, &placementEntry->connectionReferences)
|
|
||||||
{
|
|
||||||
ConnectionReference *connectionReference =
|
|
||||||
dlist_container(ConnectionReference, placementNode, it.cur);
|
|
||||||
|
|
||||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
return placementEntry;
|
||||||
{
|
|
||||||
return connectionReference;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If not using the connection, verify that FOR_DML/DDL flags are
|
|
||||||
* compatible.
|
|
||||||
*/
|
|
||||||
if (flags & FOR_DDL)
|
|
||||||
{
|
|
||||||
/* would deadlock otherwise */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection for DDL when "
|
|
||||||
"other connections exist")));
|
|
||||||
}
|
|
||||||
else if (flags & FOR_DML)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We could allow this case (as presumably the SELECT is done, but
|
|
||||||
* it'd be a bit prone to programming errors, so we disallow for
|
|
||||||
* now.
|
|
||||||
*/
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection for DML when "
|
|
||||||
"other connections exist")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* establish a new connection */
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static ConnectionReference *
|
|
||||||
CheckExistingColocatedConnections(uint32 flags,
|
|
||||||
ColocatedPlacementsHashEntry *connectionsEntry,
|
|
||||||
const char *userName)
|
|
||||||
{
|
|
||||||
dlist_iter it;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If there's a connection that has executed DML/DDL, always return it if
|
|
||||||
* possible. That's because we only can execute DML/DDL over that
|
|
||||||
* connection. Checking this first also allows us to detect conflicts due
|
|
||||||
* to opening a second connection that wants to execute DML/DDL.
|
|
||||||
*/
|
|
||||||
if (connectionsEntry->modifyingConnection)
|
|
||||||
{
|
|
||||||
ConnectionReference *connectionReference = connectionsEntry->modifyingConnection;
|
|
||||||
|
|
||||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
|
||||||
{
|
|
||||||
return connectionReference;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connectionReference->hadDDL)
|
|
||||||
{
|
|
||||||
/* would deadlock otherwise */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection when DDL has been "
|
|
||||||
"executed on existing placement connection for a colocated placement")));
|
|
||||||
}
|
|
||||||
else if (connectionReference->hadDML)
|
|
||||||
{
|
|
||||||
/* we'd not see the other connection's contents */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection when DML has been "
|
|
||||||
"executed on existing placement connection for a colocated placement")));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* modifying xact should have executed DML/DDL */
|
|
||||||
Assert(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Search existing connections for a reusable connection.
|
|
||||||
*/
|
|
||||||
dlist_foreach(it, &connectionsEntry->connectionReferences)
|
|
||||||
{
|
|
||||||
ConnectionReference *connectionReference =
|
|
||||||
dlist_container(ConnectionReference, placementNode, it.cur);
|
|
||||||
|
|
||||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
|
||||||
{
|
|
||||||
return connectionReference;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If not using the connection, verify that FOR_DML/DDL flags are
|
|
||||||
* compatible.
|
|
||||||
*/
|
|
||||||
if (flags & FOR_DDL)
|
|
||||||
{
|
|
||||||
/* would deadlock otherwise */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection for DDL when "
|
|
||||||
"other connections exist for colocated placements")));
|
|
||||||
}
|
|
||||||
else if (flags & FOR_DML)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We could allow this case (as presumably the SELECT is done, but
|
|
||||||
* it'd be a bit prone to programming errors, so we disallow for
|
|
||||||
* now.
|
|
||||||
*/
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("cannot establish new placement connection for DML when "
|
|
||||||
"other connections exist for colocated placements")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* establish a new connection */
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -535,86 +694,6 @@ CanUseExistingConnection(uint32 flags, const char *userName,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* StartColocatedPlacementConnection returns a connection that's usable for
|
|
||||||
* the passed in flags/placement.
|
|
||||||
*
|
|
||||||
* It does so by first checking whether any connection is already associated
|
|
||||||
* with the relevant entry in the colocated placement hash, or by asking for a
|
|
||||||
* formerly unassociated (possibly new) connection. Errors out if there's
|
|
||||||
* conflicting connections.
|
|
||||||
*/
|
|
||||||
static MultiConnection *
|
|
||||||
StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement,
|
|
||||||
const char *userName)
|
|
||||||
{
|
|
||||||
ConnectionReference *connectionReference = NULL;
|
|
||||||
ColocatedPlacementsHashKey key;
|
|
||||||
ColocatedPlacementsHashEntry *entry;
|
|
||||||
bool found;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Currently only hash partitioned tables can be colocated. For everything
|
|
||||||
* else we just return a new connection.
|
|
||||||
*/
|
|
||||||
if (placement->partitionMethod != DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
|
||||||
return StartNodeConnection(flags, placement->nodeName, placement->nodePort);
|
|
||||||
}
|
|
||||||
|
|
||||||
strcpy(key.nodeName, placement->nodeName);
|
|
||||||
key.nodePort = placement->nodePort;
|
|
||||||
key.colocationGroupId = placement->colocationGroupId;
|
|
||||||
key.representativeValue = placement->representativeValue;
|
|
||||||
|
|
||||||
entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found);
|
|
||||||
if (!found)
|
|
||||||
{
|
|
||||||
memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey),
|
|
||||||
0,
|
|
||||||
sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey));
|
|
||||||
dlist_init(&entry->connectionReferences);
|
|
||||||
}
|
|
||||||
|
|
||||||
connectionReference =
|
|
||||||
CheckExistingColocatedConnections(flags, entry, userName);
|
|
||||||
|
|
||||||
if (!connectionReference)
|
|
||||||
{
|
|
||||||
MultiConnection *connection = NULL;
|
|
||||||
|
|
||||||
connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort);
|
|
||||||
connectionReference = (ConnectionReference *)
|
|
||||||
MemoryContextAlloc(TopTransactionContext,
|
|
||||||
sizeof(ConnectionReference));
|
|
||||||
connectionReference->connection = connection;
|
|
||||||
connectionReference->hadDDL = false;
|
|
||||||
connectionReference->hadDML = false;
|
|
||||||
connectionReference->userName =
|
|
||||||
MemoryContextStrdup(TopTransactionContext, userName);
|
|
||||||
dlist_push_tail(&entry->connectionReferences,
|
|
||||||
&connectionReference->placementNode);
|
|
||||||
|
|
||||||
/* record association with connection, to handle connection closure */
|
|
||||||
dlist_push_tail(&connection->referencedPlacements,
|
|
||||||
&connectionReference->connectionNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flags & FOR_DDL)
|
|
||||||
{
|
|
||||||
entry->modifyingConnection = connectionReference;
|
|
||||||
connectionReference->hadDDL = true;
|
|
||||||
}
|
|
||||||
if (flags & FOR_DML)
|
|
||||||
{
|
|
||||||
entry->modifyingConnection = connectionReference;
|
|
||||||
connectionReference->hadDML = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return connectionReference->connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AssociatePlacementWithShard records shard->placement relation in
|
* AssociatePlacementWithShard records shard->placement relation in
|
||||||
* ConnectionShardHash.
|
* ConnectionShardHash.
|
||||||
|
@ -680,7 +759,7 @@ CloseShardPlacementAssociation(struct MultiConnection *connection)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note that we don't reset ConnectionPlacementHashEntry's
|
* Note that we don't reset ConnectionPlacementHashEntry's
|
||||||
* modifyingConnection here, that'd more complicated than it seems
|
* primaryConnection here, that'd more complicated than it seems
|
||||||
* worth. That means we'll error out spuriously if a DML/DDL
|
* worth. That means we'll error out spuriously if a DML/DDL
|
||||||
* executing connection is closed earlier in a transaction.
|
* executing connection is closed earlier in a transaction.
|
||||||
*/
|
*/
|
||||||
|
@ -823,16 +902,17 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
|
||||||
{
|
{
|
||||||
ConnectionPlacementHashEntry *placementEntry =
|
ConnectionPlacementHashEntry *placementEntry =
|
||||||
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
|
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
|
||||||
ConnectionReference *modifyingConnection = placementEntry->modifyingConnection;
|
ConnectionReference *primaryConnection = placementEntry->primaryConnection;
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
/* we only consider shards that are modified */
|
/* we only consider shards that are modified */
|
||||||
if (modifyingConnection == NULL)
|
if (primaryConnection == NULL ||
|
||||||
|
!(primaryConnection->hadDDL || primaryConnection->hadDML))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = modifyingConnection->connection;
|
connection = primaryConnection->connection;
|
||||||
|
|
||||||
if (!connection || connection->remoteTransaction.transactionFailed)
|
if (!connection || connection->remoteTransaction.transactionFailed)
|
||||||
{
|
{
|
||||||
|
|
|
@ -75,16 +75,17 @@ bool EnableDeadlockPrevention = true;
|
||||||
/* functions needed during run phase */
|
/* functions needed during run phase */
|
||||||
static void ReacquireMetadataLocks(List *taskList);
|
static void ReacquireMetadataLocks(List *taskList);
|
||||||
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
|
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
|
||||||
|
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
||||||
|
ShardPlacementAccessType accessType);
|
||||||
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
||||||
bool expectResults);
|
bool expectResults);
|
||||||
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
||||||
static List * GetModifyConnections(List *taskPlacementList, bool markCritical,
|
static List * GetModifyConnections(Task *task, bool markCritical,
|
||||||
bool startedInTransaction);
|
bool startedInTransaction);
|
||||||
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||||
bool isModificationQuery, bool expectResults);
|
bool isModificationQuery, bool expectResults);
|
||||||
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
|
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
|
||||||
ParamListInfo paramListInfo, CitusScanState *scanState);
|
ParamListInfo paramListInfo, CitusScanState *scanState);
|
||||||
static List * TaskShardIntervalList(List *taskList);
|
|
||||||
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
|
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
|
||||||
static void AcquireExecutorMultiShardLocks(List *taskList);
|
static void AcquireExecutorMultiShardLocks(List *taskList);
|
||||||
static bool RequiresConsistentSnapshot(Task *task);
|
static bool RequiresConsistentSnapshot(Task *task);
|
||||||
|
@ -601,13 +602,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
List *taskPlacementList = task->taskPlacementList;
|
List *taskPlacementList = task->taskPlacementList;
|
||||||
ListCell *taskPlacementCell = NULL;
|
ListCell *taskPlacementCell = NULL;
|
||||||
char *queryString = task->queryString;
|
char *queryString = task->queryString;
|
||||||
|
List *relationShardList = task->relationShardList;
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("single-shard query may not appear in transaction blocks "
|
|
||||||
"which contain multi-shard data modifications")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to run the query to completion on one placement. If the query fails
|
* Try to run the query to completion on one placement. If the query fails
|
||||||
|
@ -620,8 +615,32 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
bool dontFailOnError = false;
|
bool dontFailOnError = false;
|
||||||
int64 currentAffectedTupleCount = 0;
|
int64 currentAffectedTupleCount = 0;
|
||||||
int connectionFlags = SESSION_LIFESPAN;
|
int connectionFlags = SESSION_LIFESPAN;
|
||||||
MultiConnection *connection =
|
List *placementAccessList = NIL;
|
||||||
GetPlacementConnection(connectionFlags, taskPlacement, NULL);
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
|
if (list_length(relationShardList) > 0)
|
||||||
|
{
|
||||||
|
placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName,
|
||||||
|
taskPlacement->nodePort,
|
||||||
|
relationShardList);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When the SELECT prunes down to 0 shards, just use the dummy placement.
|
||||||
|
*
|
||||||
|
* FIXME: it would be preferable to evaluate the SELECT locally since no
|
||||||
|
* data from the workers is required.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ShardPlacementAccess *placementAccess =
|
||||||
|
CreatePlacementAccess(taskPlacement, PLACEMENT_ACCESS_SELECT);
|
||||||
|
|
||||||
|
placementAccessList = list_make1(placementAccess);
|
||||||
|
}
|
||||||
|
|
||||||
|
connection = GetPlacementListConnection(connectionFlags, placementAccessList,
|
||||||
|
NULL);
|
||||||
|
|
||||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||||
if (!queryOK)
|
if (!queryOK)
|
||||||
|
@ -641,6 +660,56 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BuildPlacementSelectList builds a list of SELECT placement accesses
|
||||||
|
* which can be used to call StartPlacementListConnection or
|
||||||
|
* GetPlacementListConnection.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
BuildPlacementSelectList(char *nodeName, int nodePort, List *relationShardList)
|
||||||
|
{
|
||||||
|
ListCell *relationShardCell = NULL;
|
||||||
|
List *placementAccessList = NIL;
|
||||||
|
|
||||||
|
foreach(relationShardCell, relationShardList)
|
||||||
|
{
|
||||||
|
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||||
|
ShardPlacement *placement = NULL;
|
||||||
|
ShardPlacementAccess *placementAccess = NULL;
|
||||||
|
|
||||||
|
placement = FindShardPlacementOnNode(nodeName, nodePort, relationShard->shardId);
|
||||||
|
if (placement == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("no active placement of shard %ld found on node "
|
||||||
|
"%s:%d",
|
||||||
|
relationShard->shardId, nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
|
||||||
|
placementAccessList = lappend(placementAccessList, placementAccess);
|
||||||
|
}
|
||||||
|
|
||||||
|
return placementAccessList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreatePlacementAccess returns a new ShardPlacementAccess for the given placement
|
||||||
|
* and access type.
|
||||||
|
*/
|
||||||
|
static ShardPlacementAccess *
|
||||||
|
CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType)
|
||||||
|
{
|
||||||
|
ShardPlacementAccess *placementAccess = NULL;
|
||||||
|
|
||||||
|
placementAccess = (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
|
||||||
|
placementAccess->placement = placement;
|
||||||
|
placementAccess->accessType = accessType;
|
||||||
|
|
||||||
|
return placementAccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecuteSingleModifyTask executes the task on the remote node, retrieves the
|
* ExecuteSingleModifyTask executes the task on the remote node, retrieves the
|
||||||
* results and stores them, if RETURNING is used, in a tuple store.
|
* results and stores them, if RETURNING is used, in a tuple store.
|
||||||
|
@ -669,14 +738,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
||||||
bool startedInTransaction =
|
bool startedInTransaction =
|
||||||
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("single-shard DML commands must not appear in "
|
|
||||||
"transaction blocks which contain multi-shard data "
|
|
||||||
"modifications")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Modifications for reference tables are always done using 2PC. First
|
* Modifications for reference tables are always done using 2PC. First
|
||||||
* ensure that distributed transaction is started. Then force the
|
* ensure that distributed transaction is started. Then force the
|
||||||
|
@ -705,7 +766,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
||||||
* establish the connection, mark as critical (when modifying reference
|
* establish the connection, mark as critical (when modifying reference
|
||||||
* table) and start a transaction (when in a transaction).
|
* table) and start a transaction (when in a transaction).
|
||||||
*/
|
*/
|
||||||
connectionList = GetModifyConnections(taskPlacementList,
|
connectionList = GetModifyConnections(task,
|
||||||
taskRequiresTwoPhaseCommit,
|
taskRequiresTwoPhaseCommit,
|
||||||
startedInTransaction);
|
startedInTransaction);
|
||||||
|
|
||||||
|
@ -808,10 +869,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
||||||
* transaction in progress.
|
* transaction in progress.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions)
|
GetModifyConnections(Task *task, bool markCritical, bool noNewTransactions)
|
||||||
{
|
{
|
||||||
|
List *taskPlacementList = task->taskPlacementList;
|
||||||
ListCell *taskPlacementCell = NULL;
|
ListCell *taskPlacementCell = NULL;
|
||||||
List *multiConnectionList = NIL;
|
List *multiConnectionList = NIL;
|
||||||
|
List *relationShardList = task->relationShardList;
|
||||||
|
|
||||||
/* first initiate connection establishment for all necessary connections */
|
/* first initiate connection establishment for all necessary connections */
|
||||||
foreach(taskPlacementCell, taskPlacementList)
|
foreach(taskPlacementCell, taskPlacementList)
|
||||||
|
@ -819,14 +882,22 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans
|
||||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||||
int connectionFlags = SESSION_LIFESPAN | FOR_DML;
|
int connectionFlags = SESSION_LIFESPAN | FOR_DML;
|
||||||
MultiConnection *multiConnection = NULL;
|
MultiConnection *multiConnection = NULL;
|
||||||
|
List *placementAccessList = NIL;
|
||||||
|
ShardPlacementAccess *placementModification = NULL;
|
||||||
|
|
||||||
/*
|
/* create placement accesses for placements that appear in a subselect */
|
||||||
* FIXME: It's not actually correct to use only one shard placement
|
placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName,
|
||||||
* here for router queries involving multiple relations. We should
|
taskPlacement->nodePort,
|
||||||
* check that this connection is the only modifying one associated
|
relationShardList);
|
||||||
* with all the involved shards.
|
|
||||||
*/
|
/* create placement access for the placement that we're modifying */
|
||||||
multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL);
|
placementModification = CreatePlacementAccess(taskPlacement,
|
||||||
|
PLACEMENT_ACCESS_DML);
|
||||||
|
placementAccessList = lappend(placementAccessList, placementModification);
|
||||||
|
|
||||||
|
/* get an appropriate connection for the DML statement */
|
||||||
|
multiConnection = GetPlacementListConnection(connectionFlags, placementAccessList,
|
||||||
|
NULL);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If already in a transaction, disallow expanding set of remote
|
* If already in a transaction, disallow expanding set of remote
|
||||||
|
@ -943,7 +1014,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
ListCell *taskCell = NULL;
|
ListCell *taskCell = NULL;
|
||||||
Task *firstTask = NULL;
|
Task *firstTask = NULL;
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
List *shardIntervalList = NIL;
|
|
||||||
List *affectedTupleCountList = NIL;
|
List *affectedTupleCountList = NIL;
|
||||||
HTAB *shardConnectionHash = NULL;
|
HTAB *shardConnectionHash = NULL;
|
||||||
bool tasksPending = true;
|
bool tasksPending = true;
|
||||||
|
@ -954,16 +1024,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("multi-shard data modifications must not appear in "
|
|
||||||
"transaction blocks which contain single-shard DML "
|
|
||||||
"commands")));
|
|
||||||
}
|
|
||||||
|
|
||||||
shardIntervalList = TaskShardIntervalList(taskList);
|
|
||||||
|
|
||||||
/* ensure that there are no concurrent modifications on the same shards */
|
/* ensure that there are no concurrent modifications on the same shards */
|
||||||
AcquireExecutorMultiShardLocks(taskList);
|
AcquireExecutorMultiShardLocks(taskList);
|
||||||
|
|
||||||
|
@ -987,10 +1047,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
}
|
}
|
||||||
|
|
||||||
/* open connection to all relevant placements, if not already open */
|
/* open connection to all relevant placements, if not already open */
|
||||||
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
|
shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags);
|
||||||
connectionFlags);
|
|
||||||
|
|
||||||
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||||
|
|
||||||
/* iterate over placements in rounds, to ensure in-order execution */
|
/* iterate over placements in rounds, to ensure in-order execution */
|
||||||
while (tasksPending)
|
while (tasksPending)
|
||||||
|
@ -1126,29 +1185,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TaskShardIntervalList returns a list of shard intervals for a given list of
|
|
||||||
* tasks.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
TaskShardIntervalList(List *taskList)
|
|
||||||
{
|
|
||||||
ListCell *taskCell = NULL;
|
|
||||||
List *shardIntervalList = NIL;
|
|
||||||
|
|
||||||
foreach(taskCell, taskList)
|
|
||||||
{
|
|
||||||
Task *task = (Task *) lfirst(taskCell);
|
|
||||||
int64 shardId = task->anchorShardId;
|
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
|
||||||
|
|
||||||
shardIntervalList = lappend(shardIntervalList, shardInterval);
|
|
||||||
}
|
|
||||||
|
|
||||||
return shardIntervalList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendQueryInSingleRowMode sends the given query on the connection in an
|
* SendQueryInSingleRowMode sends the given query on the connection in an
|
||||||
* asynchronous way. The function also sets the single-row mode on the
|
* asynchronous way. The function also sets the single-row mode on the
|
||||||
|
|
|
@ -2494,14 +2494,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
{
|
{
|
||||||
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
|
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("distributed DDL commands must not appear within "
|
|
||||||
"transaction blocks containing single-shard data "
|
|
||||||
"modifications")));
|
|
||||||
}
|
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
if (!ddlJob->concurrentIndexCmd)
|
if (!ddlJob->concurrentIndexCmd)
|
||||||
|
|
|
@ -146,7 +146,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
uint64 totalRelationSize = 0;
|
uint64 totalRelationSize = 0;
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
errmsg("citus size functions cannot be called in transaction"
|
errmsg("citus size functions cannot be called in transaction"
|
||||||
|
|
|
@ -210,7 +210,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation
|
||||||
task = CitusMakeNode(Task);
|
task = CitusMakeNode(Task);
|
||||||
task->jobId = jobId;
|
task->jobId = jobId;
|
||||||
task->taskId = taskId++;
|
task->taskId = taskId++;
|
||||||
task->taskType = SQL_TASK;
|
task->taskType = MODIFY_TASK;
|
||||||
task->queryString = shardQueryString->data;
|
task->queryString = shardQueryString->data;
|
||||||
task->dependedTaskList = NULL;
|
task->dependedTaskList = NULL;
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
#include "distributed/multi_shard_transaction.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
@ -30,25 +31,27 @@
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* OpenTransactionsToAllShardPlacements opens connections to all placements
|
* OpenTransactionsForAllTasks opens a connection for each task,
|
||||||
* using the provided shard identifier list and returns it as a shard ID ->
|
* taking into account which shards are read and modified by the task
|
||||||
* ShardConnections hash. connectionFlags can be used to specify whether
|
* to select the appopriate connection, or error out if no appropriate
|
||||||
* the command is FOR_DML or FOR_DDL.
|
* connection can be found. The set of connections is returned as an
|
||||||
|
* anchor shard ID -> ShardConnections hash.
|
||||||
*/
|
*/
|
||||||
HTAB *
|
HTAB *
|
||||||
OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlags)
|
OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
|
||||||
{
|
{
|
||||||
HTAB *shardConnectionHash = NULL;
|
HTAB *shardConnectionHash = NULL;
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *taskCell = NULL;
|
||||||
List *newConnectionList = NIL;
|
List *newConnectionList = NIL;
|
||||||
|
|
||||||
shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext);
|
shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext);
|
||||||
|
|
||||||
/* open connections to shards which don't have connections yet */
|
/* open connections to shards which don't have connections yet */
|
||||||
foreach(shardIntervalCell, shardIntervalList)
|
foreach(taskCell, taskList)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
uint64 shardId = shardInterval->shardId;
|
ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT;
|
||||||
|
uint64 shardId = task->anchorShardId;
|
||||||
ShardConnections *shardConnections = NULL;
|
ShardConnections *shardConnections = NULL;
|
||||||
bool shardConnectionsFound = false;
|
bool shardConnectionsFound = false;
|
||||||
List *shardPlacementList = NIL;
|
List *shardPlacementList = NIL;
|
||||||
|
@ -69,9 +72,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag
|
||||||
UINT64_FORMAT, shardId)));
|
UINT64_FORMAT, shardId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (task->taskType == MODIFY_TASK)
|
||||||
|
{
|
||||||
|
accessType = PLACEMENT_ACCESS_DML;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* can only open connections for DDL and DML commands */
|
||||||
|
Assert(task->taskType == DDL_TASK);
|
||||||
|
|
||||||
|
accessType = PLACEMENT_ACCESS_DDL;
|
||||||
|
}
|
||||||
|
|
||||||
foreach(placementCell, shardPlacementList)
|
foreach(placementCell, shardPlacementList)
|
||||||
{
|
{
|
||||||
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
|
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
|
||||||
|
ShardPlacementAccess placementModification;
|
||||||
|
List *placementAccessList = NIL;
|
||||||
|
List *placementSelectList = NIL;
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
|
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
|
||||||
|
@ -83,9 +101,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag
|
||||||
shardPlacement->nodePort)));
|
shardPlacement->nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = StartPlacementConnection(connectionFlags,
|
/* add placement access for modification */
|
||||||
shardPlacement,
|
placementModification.placement = shardPlacement;
|
||||||
NULL);
|
placementModification.accessType = accessType;
|
||||||
|
|
||||||
|
placementAccessList = lappend(placementAccessList, &placementModification);
|
||||||
|
|
||||||
|
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
|
||||||
|
placementSelectList = BuildPlacementSelectList(shardPlacement->nodeName,
|
||||||
|
shardPlacement->nodePort,
|
||||||
|
task->relationShardList);
|
||||||
|
placementAccessList = list_concat(placementAccessList, placementSelectList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Find a connection that sees preceding writes and cannot self-deadlock,
|
||||||
|
* or error out if no such connection exists.
|
||||||
|
*/
|
||||||
|
connection = StartPlacementListConnection(connectionFlags,
|
||||||
|
placementAccessList, NULL);
|
||||||
|
|
||||||
ClaimConnectionExclusively(connection);
|
ClaimConnectionExclusively(connection);
|
||||||
|
|
||||||
|
|
|
@ -356,6 +356,43 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindShardPlacementOnNode returns the shard placement for the given shard
|
||||||
|
* on the given node, or returns NULL of no placement for the shard exists
|
||||||
|
* on the node.
|
||||||
|
*/
|
||||||
|
ShardPlacement *
|
||||||
|
FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId)
|
||||||
|
{
|
||||||
|
ShardCacheEntry *shardEntry = NULL;
|
||||||
|
DistTableCacheEntry *tableEntry = NULL;
|
||||||
|
ShardPlacement *placementArray = NULL;
|
||||||
|
int numberOfPlacements = 0;
|
||||||
|
ShardPlacement *placementOnNode = NULL;
|
||||||
|
int placementIndex = 0;
|
||||||
|
|
||||||
|
shardEntry = LookupShardCacheEntry(shardId);
|
||||||
|
tableEntry = shardEntry->tableEntry;
|
||||||
|
placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex];
|
||||||
|
numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex];
|
||||||
|
|
||||||
|
for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
|
||||||
|
{
|
||||||
|
ShardPlacement *placement = &placementArray[placementIndex];
|
||||||
|
|
||||||
|
if (strncmp(nodeName, placement->nodeName, WORKER_LENGTH) == 0 &&
|
||||||
|
nodePort == placement->nodePort)
|
||||||
|
{
|
||||||
|
placementOnNode = CitusMakeNode(ShardPlacement);
|
||||||
|
CopyShardPlacement(placement, placementOnNode);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return placementOnNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShardPlacementList returns the list of placements for the given shard from
|
* ShardPlacementList returns the list of placements for the given shard from
|
||||||
* the cache.
|
* the cache.
|
||||||
|
|
|
@ -70,6 +70,8 @@ extern bool IsDistributedTable(Oid relationId);
|
||||||
extern List * DistributedTableList(void);
|
extern List * DistributedTableList(void);
|
||||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||||
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
||||||
|
extern ShardPlacement * FindShardPlacementOnNode(char *nodeName, int nodePort,
|
||||||
|
uint64 shardId);
|
||||||
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
||||||
extern int GetLocalGroupId(void);
|
extern int GetLocalGroupId(void);
|
||||||
extern List * DistTableOidList(void);
|
extern List * DistTableOidList(void);
|
||||||
|
|
|
@ -186,7 +186,7 @@ typedef struct Task
|
||||||
char replicationModel; /* only applies to modify tasks */
|
char replicationModel; /* only applies to modify tasks */
|
||||||
|
|
||||||
bool insertSelectQuery;
|
bool insertSelectQuery;
|
||||||
List *relationShardList; /* only applies INSERT/SELECT tasks */
|
List *relationShardList;
|
||||||
} Task;
|
} Task;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -43,5 +43,7 @@ extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
|
||||||
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
||||||
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
|
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
|
||||||
|
|
||||||
|
extern List * BuildPlacementSelectList(char *nodeName, int nodePort,
|
||||||
|
List *relationShardList);
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
||||||
|
|
|
@ -27,8 +27,7 @@ typedef struct ShardConnections
|
||||||
} ShardConnections;
|
} ShardConnections;
|
||||||
|
|
||||||
|
|
||||||
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList,
|
extern HTAB * OpenTransactionsForAllTasks(List *taskList, int connectionFlags);
|
||||||
int connectionFlags);
|
|
||||||
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
|
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
|
||||||
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
|
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
|
||||||
bool *connectionsFound);
|
bool *connectionsFound);
|
||||||
|
|
|
@ -15,6 +15,30 @@
|
||||||
/* forward declare, to avoid dependency on ShardPlacement definition */
|
/* forward declare, to avoid dependency on ShardPlacement definition */
|
||||||
struct ShardPlacement;
|
struct ShardPlacement;
|
||||||
|
|
||||||
|
/* represents the way in which a placement is accessed */
|
||||||
|
typedef enum ShardPlacementAccessType
|
||||||
|
{
|
||||||
|
/* read from placement */
|
||||||
|
PLACEMENT_ACCESS_SELECT,
|
||||||
|
|
||||||
|
/* modify rows in placement */
|
||||||
|
PLACEMENT_ACCESS_DML,
|
||||||
|
|
||||||
|
/* modify placement schema */
|
||||||
|
PLACEMENT_ACCESS_DDL
|
||||||
|
} ShardPlacementAccessType;
|
||||||
|
|
||||||
|
/* represents access to a placement */
|
||||||
|
typedef struct ShardPlacementAccess
|
||||||
|
{
|
||||||
|
/* placement that is accessed */
|
||||||
|
struct ShardPlacement *placement;
|
||||||
|
|
||||||
|
/* the way in which the placement is accessed */
|
||||||
|
ShardPlacementAccessType accessType;
|
||||||
|
} ShardPlacementAccess;
|
||||||
|
|
||||||
|
|
||||||
extern MultiConnection * GetPlacementConnection(uint32 flags,
|
extern MultiConnection * GetPlacementConnection(uint32 flags,
|
||||||
struct ShardPlacement *placement,
|
struct ShardPlacement *placement,
|
||||||
const char *userName);
|
const char *userName);
|
||||||
|
@ -22,6 +46,13 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
|
||||||
struct ShardPlacement *placement,
|
struct ShardPlacement *placement,
|
||||||
const char *userName);
|
const char *userName);
|
||||||
|
|
||||||
|
extern MultiConnection * GetPlacementListConnection(uint32 flags,
|
||||||
|
List *placementAccessList,
|
||||||
|
const char *userName);
|
||||||
|
extern MultiConnection * StartPlacementListConnection(uint32 flags,
|
||||||
|
List *placementAccessList,
|
||||||
|
const char *userName);
|
||||||
|
|
||||||
extern void ResetPlacementConnectionManagement(void);
|
extern void ResetPlacementConnectionManagement(void);
|
||||||
extern void MarkFailedShardPlacements(void);
|
extern void MarkFailedShardPlacements(void);
|
||||||
extern void PostCommitMarkFailedShardPlacements(bool using2PC);
|
extern void PostCommitMarkFailedShardPlacements(bool using2PC);
|
||||||
|
|
|
@ -441,9 +441,9 @@ SELECT create_distributed_table('products', 'product_no');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO products VALUES(1,'product_1', 5);
|
INSERT INTO products VALUES(1,'product_1', 5);
|
||||||
-- Should error out since conflicts with the above single-shard data modification command.
|
-- DDL may error out after an INSERT because it might pick the wrong connection
|
||||||
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
ERROR: cannot establish a new connection for placement 2327, since DML has been executed on a connection that is in use
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- Add constraints
|
-- Add constraints
|
||||||
|
@ -451,9 +451,7 @@ ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
||||||
ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price);
|
ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price);
|
||||||
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
||||||
-- Single shard DML command can't be located in the same transaction with above commands.
|
|
||||||
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- There should be no constraint on master and worker(s)
|
-- There should be no constraint on master and worker(s)
|
||||||
SELECT "Constraint", "Definition" FROM table_checks WHERE relid='products'::regclass;
|
SELECT "Constraint", "Definition" FROM table_checks WHERE relid='products'::regclass;
|
||||||
|
|
|
@ -1636,15 +1636,24 @@ ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE raw_events_second DROP COLUMN value_4;
|
ALTER TABLE raw_events_second DROP COLUMN value_4;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Insert after copy is currently disallowed because of the way the
|
-- Altering a reference table and then performing an INSERT ... SELECT which
|
||||||
-- transaction modification state is currently handled. Copy is also
|
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
||||||
-- rolled back.
|
-- would read from the reference table over others connections than the ones
|
||||||
|
-- that performed the DDL.
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
|
INSERT INTO raw_events_first (user_id)
|
||||||
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
|
ERROR: cannot establish a new connection for placement 655, since DDL has been executed on a connection that is in use
|
||||||
|
ROLLBACK;
|
||||||
|
-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses
|
||||||
|
-- to use a connection for one shard, while the connection already modified
|
||||||
|
-- another shard.
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
||||||
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
|
ERROR: cannot establish a new connection for placement 636, since DML has been executed on a connection that is in use
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Insert after copy is currently allowed for single-shard operation.
|
-- Insert after copy is currently allowed for single-shard operation.
|
||||||
-- Both insert and copy are rolled back successfully.
|
-- Both insert and copy are rolled back successfully.
|
||||||
|
@ -1658,7 +1667,6 @@ SELECT user_id FROM raw_events_first WHERE user_id = 101;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Copy after insert is currently disallowed.
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
||||||
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
|
|
|
@ -172,18 +172,16 @@ SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ABORT;
|
ABORT;
|
||||||
-- applies to DDL, too
|
-- we can mix DDL and INSERT
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
ABORT;
|
||||||
COMMIT;
|
|
||||||
-- whether it occurs first or second
|
-- whether it occurs first or second
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
ABORT;
|
||||||
COMMIT;
|
|
||||||
-- but the DDL should correctly roll back
|
-- but the DDL should correctly roll back
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
||||||
Column | Type | Modifiers
|
Column | Type | Modifiers
|
||||||
|
@ -207,7 +205,7 @@ BEGIN;
|
||||||
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald');
|
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald');
|
||||||
INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
|
INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
|
||||||
\copy researchers from stdin delimiter ','
|
\copy researchers from stdin delimiter ','
|
||||||
ERROR: cannot establish new placement connection when DML has been executed on existing placement connection
|
ERROR: cannot establish a new connection for placement 2704, since DML has been executed on a connection that is in use
|
||||||
CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie"
|
CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie"
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- after a COPY you can modify multiple shards, since they'll use different connections
|
-- after a COPY you can modify multiple shards, since they'll use different connections
|
||||||
|
@ -383,16 +381,29 @@ ORDER BY nodeport;
|
||||||
localhost | 57638 | t | DROP FUNCTION
|
localhost | 57638 | t | DROP FUNCTION
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- ALTER TABLE and COPY are compatible if ALTER TABLE precedes COPY
|
-- ALTER and copy are compatible
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- but not if COPY precedes ALTER TABLE
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
ABORT;
|
||||||
|
-- cannot perform DDL once a connection is used for multiple shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
|
||||||
|
lab_id
|
||||||
|
--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
|
||||||
|
lab_id
|
||||||
|
--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
ALTER TABLE researchers ADD COLUMN motto text;
|
||||||
|
ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- multi-shard operations can co-exist with DDL in a transactional way
|
-- multi-shard operations can co-exist with DDL in a transactional way
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
@ -1283,3 +1294,212 @@ SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
||||||
|
-- set up foreign keys to test transactions with co-located and reference tables
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.shard_replication_factor TO 1;
|
||||||
|
SET LOCAL citus.shard_count TO 4;
|
||||||
|
CREATE TABLE usergroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('usergroups');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE itemgroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('itemgroups');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
id int PRIMARY KEY,
|
||||||
|
name text,
|
||||||
|
user_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('users', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE items (
|
||||||
|
user_id int REFERENCES users (id) ON DELETE CASCADE,
|
||||||
|
item_name text,
|
||||||
|
item_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('items', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Table to find values that live in different shards on the same node
|
||||||
|
SELECT id, shard_name('users', shardid), nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
JOIN
|
||||||
|
( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids
|
||||||
|
USING (shardid)
|
||||||
|
ORDER BY
|
||||||
|
id;
|
||||||
|
id | shard_name | nodename | nodeport
|
||||||
|
----+----------------------+-----------+----------
|
||||||
|
1 | public.users_1200022 | localhost | 57637
|
||||||
|
2 | public.users_1200025 | localhost | 57638
|
||||||
|
3 | public.users_1200023 | localhost | 57638
|
||||||
|
4 | public.users_1200023 | localhost | 57638
|
||||||
|
5 | public.users_1200022 | localhost | 57637
|
||||||
|
6 | public.users_1200024 | localhost | 57637
|
||||||
|
7 | public.users_1200023 | localhost | 57638
|
||||||
|
8 | public.users_1200022 | localhost | 57637
|
||||||
|
9 | public.users_1200025 | localhost | 57638
|
||||||
|
10 | public.users_1200022 | localhost | 57637
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- the INSERTs into items should see the users
|
||||||
|
BEGIN;
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
INSERT INTO items VALUES (1, 'item-1');
|
||||||
|
INSERT INTO items VALUES (6, 'item-6');
|
||||||
|
END;
|
||||||
|
SELECT user_id FROM items ORDER BY user_id;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
1
|
||||||
|
6
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- should not be able to open multiple connections per node after INSERTing over one connection
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO users VALUES (2, 'burak');
|
||||||
|
INSERT INTO users VALUES (3, 'burak');
|
||||||
|
\COPY items FROM STDIN WITH CSV
|
||||||
|
ERROR: cannot establish a new connection for placement 2743, since DML has been executed on a connection that is in use
|
||||||
|
END;
|
||||||
|
-- cannot perform DDL after a co-located table has been read over 1 connection
|
||||||
|
BEGIN;
|
||||||
|
SELECT id FROM users WHERE id = 1;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT id FROM users WHERE id = 6;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection
|
||||||
|
END;
|
||||||
|
-- but the other way around is fine
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- now read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- perform a DDL command on the reference table
|
||||||
|
ALTER TABLE itemgroups ADD COLUMN last_update timestamptz;
|
||||||
|
ERROR: cannot perform DDL on placement 2737, which has been read over multiple connections
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- perform a DDL command on a co-located reference table
|
||||||
|
ALTER TABLE usergroups ADD COLUMN last_update timestamptz;
|
||||||
|
ERROR: cannot perform DDL on placement 2735 since a co-located placement has been read over multiple connections
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- make a modification over connection 1
|
||||||
|
INSERT INTO usergroups VALUES (0,'istanbul');
|
||||||
|
-- copy over connections 1 and 2
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- cannot read modifications made over different connections
|
||||||
|
SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3;
|
||||||
|
ERROR: cannot perform query with placements that were modified over multiple connections
|
||||||
|
END;
|
||||||
|
-- make sure we can see cascading deletes
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM users');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- test visibility after COPY
|
||||||
|
INSERT INTO usergroups VALUES (2,'group');
|
||||||
|
BEGIN;
|
||||||
|
-- opens two separate connections to node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- Uses first connection, which wrote the row with id = 2
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
|
||||||
|
id | name | user_group | gid | name
|
||||||
|
----+-------+------------+-----+-------
|
||||||
|
2 | onder | 2 | 2 | group
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Should use second connection, which wrote the row with id = 4
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
||||||
|
id | name | user_group | gid | name
|
||||||
|
----+-------+------------+-----+-------
|
||||||
|
4 | murat | 2 | 2 | group
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
DROP TABLE items, users, itemgroups, usergroups;
|
||||||
|
|
|
@ -179,12 +179,6 @@ SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ABORT;
|
ABORT;
|
||||||
-- applies to DDL
|
|
||||||
BEGIN;
|
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
|
||||||
ALTER TABLE labs_mx ADD COLUMN motto text;
|
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
|
||||||
COMMIT;
|
|
||||||
-- doesn't apply to COPY after modifications
|
-- doesn't apply to COPY after modifications
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
|
|
|
@ -1609,19 +1609,22 @@ SELECT * FROM reference_table_test;
|
||||||
10 | 2 | 2 | Fri Dec 02 00:00:00 2016
|
10 | 2 | 2 | Fri Dec 02 00:00:00 2016
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- do not allow mixing transactions
|
-- DML+master_modify_multiple_shards is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
||||||
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
10
|
||||||
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Do not allow DDL and modification in the same transaction
|
-- DDL+DML is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
||||||
NOTICE: using one-phase commit for distributed DDL commands
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- clean up tables
|
-- clean up tables
|
||||||
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
|
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
|
||||||
|
|
|
@ -382,7 +382,7 @@ SELECT create_distributed_table('products', 'product_no');
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO products VALUES(1,'product_1', 5);
|
INSERT INTO products VALUES(1,'product_1', 5);
|
||||||
|
|
||||||
-- Should error out since conflicts with the above single-shard data modification command.
|
-- DDL may error out after an INSERT because it might pick the wrong connection
|
||||||
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
@ -393,7 +393,6 @@ ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price);
|
||||||
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
||||||
|
|
||||||
-- Single shard DML command can't be located in the same transaction with above commands.
|
|
||||||
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
|
@ -1332,9 +1332,19 @@ ALTER TABLE raw_events_second DROP COLUMN value_4;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- Insert after copy is currently disallowed because of the way the
|
-- Altering a reference table and then performing an INSERT ... SELECT which
|
||||||
-- transaction modification state is currently handled. Copy is also
|
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
||||||
-- rolled back.
|
-- would read from the reference table over others connections than the ones
|
||||||
|
-- that performed the DDL.
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
|
INSERT INTO raw_events_first (user_id)
|
||||||
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses
|
||||||
|
-- to use a connection for one shard, while the connection already modified
|
||||||
|
-- another shard.
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
100,100
|
100,100
|
||||||
|
@ -1352,7 +1362,6 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101
|
||||||
SELECT user_id FROM raw_events_first WHERE user_id = 101;
|
SELECT user_id FROM raw_events_first WHERE user_id = 101;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- Copy after insert is currently disallowed.
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
||||||
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
|
|
|
@ -138,17 +138,17 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
-- applies to DDL, too
|
-- we can mix DDL and INSERT
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
COMMIT;
|
ABORT;
|
||||||
|
|
||||||
-- whether it occurs first or second
|
-- whether it occurs first or second
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
COMMIT;
|
ABORT;
|
||||||
|
|
||||||
-- but the DDL should correctly roll back
|
-- but the DDL should correctly roll back
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
||||||
|
@ -289,7 +289,7 @@ ORDER BY nodeport, shardid;
|
||||||
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
|
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
|
||||||
ORDER BY nodeport;
|
ORDER BY nodeport;
|
||||||
|
|
||||||
-- ALTER TABLE and COPY are compatible if ALTER TABLE precedes COPY
|
-- ALTER and copy are compatible
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
|
@ -297,12 +297,18 @@ ALTER TABLE labs ADD COLUMN motto text;
|
||||||
\.
|
\.
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- but not if COPY precedes ALTER TABLE
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
12,fsociety
|
12,fsociety
|
||||||
\.
|
\.
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
|
-- cannot perform DDL once a connection is used for multiple shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
|
||||||
|
ALTER TABLE researchers ADD COLUMN motto text;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- multi-shard operations can co-exist with DDL in a transactional way
|
-- multi-shard operations can co-exist with DDL in a transactional way
|
||||||
|
@ -944,3 +950,147 @@ DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts
|
||||||
|
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
||||||
|
|
||||||
|
-- set up foreign keys to test transactions with co-located and reference tables
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.shard_replication_factor TO 1;
|
||||||
|
SET LOCAL citus.shard_count TO 4;
|
||||||
|
|
||||||
|
CREATE TABLE usergroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('usergroups');
|
||||||
|
|
||||||
|
CREATE TABLE itemgroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('itemgroups');
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
id int PRIMARY KEY,
|
||||||
|
name text,
|
||||||
|
user_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('users', 'id');
|
||||||
|
|
||||||
|
CREATE TABLE items (
|
||||||
|
user_id int REFERENCES users (id) ON DELETE CASCADE,
|
||||||
|
item_name text,
|
||||||
|
item_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('items', 'user_id');
|
||||||
|
|
||||||
|
-- Table to find values that live in different shards on the same node
|
||||||
|
SELECT id, shard_name('users', shardid), nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
JOIN
|
||||||
|
( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids
|
||||||
|
USING (shardid)
|
||||||
|
ORDER BY
|
||||||
|
id;
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- the INSERTs into items should see the users
|
||||||
|
BEGIN;
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
1,brian,0
|
||||||
|
6,metin,0
|
||||||
|
\.
|
||||||
|
INSERT INTO items VALUES (1, 'item-1');
|
||||||
|
INSERT INTO items VALUES (6, 'item-6');
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT user_id FROM items ORDER BY user_id;
|
||||||
|
|
||||||
|
-- should not be able to open multiple connections per node after INSERTing over one connection
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO users VALUES (2, 'burak');
|
||||||
|
INSERT INTO users VALUES (3, 'burak');
|
||||||
|
\COPY items FROM STDIN WITH CSV
|
||||||
|
2,item-2,0
|
||||||
|
3,item-3,0
|
||||||
|
\.
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- cannot perform DDL after a co-located table has been read over 1 connection
|
||||||
|
BEGIN;
|
||||||
|
SELECT id FROM users WHERE id = 1;
|
||||||
|
SELECT id FROM users WHERE id = 6;
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- but the other way around is fine
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1;
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,burak,0
|
||||||
|
3,burak,0
|
||||||
|
\.
|
||||||
|
-- now read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
-- perform a DDL command on the reference table
|
||||||
|
ALTER TABLE itemgroups ADD COLUMN last_update timestamptz;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,burak,0
|
||||||
|
3,burak,0
|
||||||
|
\.
|
||||||
|
-- read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
-- perform a DDL command on a co-located reference table
|
||||||
|
ALTER TABLE usergroups ADD COLUMN last_update timestamptz;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- make a modification over connection 1
|
||||||
|
INSERT INTO usergroups VALUES (0,'istanbul');
|
||||||
|
-- copy over connections 1 and 2
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,burak,0
|
||||||
|
3,burak,0
|
||||||
|
\.
|
||||||
|
-- cannot read modifications made over different connections
|
||||||
|
SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- make sure we can see cascading deletes
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM users');
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1;
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- test visibility after COPY
|
||||||
|
|
||||||
|
INSERT INTO usergroups VALUES (2,'group');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- opens two separate connections to node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,onder,2
|
||||||
|
4,murat,2
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Uses first connection, which wrote the row with id = 2
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
|
||||||
|
|
||||||
|
-- Should use second connection, which wrote the row with id = 4
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
||||||
|
END;
|
||||||
|
|
||||||
|
DROP TABLE items, users, itemgroups, usergroups;
|
||||||
|
|
|
@ -150,12 +150,6 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
-- applies to DDL
|
|
||||||
BEGIN;
|
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
|
||||||
ALTER TABLE labs_mx ADD COLUMN motto text;
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
-- doesn't apply to COPY after modifications
|
-- doesn't apply to COPY after modifications
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
|
|
|
@ -994,13 +994,13 @@ UPDATE reference_table_test SET value_1 = 10 WHERE value_1 = 2;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT * FROM reference_table_test;
|
SELECT * FROM reference_table_test;
|
||||||
|
|
||||||
-- do not allow mixing transactions
|
-- DML+master_modify_multiple_shards is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- Do not allow DDL and modification in the same transaction
|
-- DDL+DML is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
|
|
Loading…
Reference in New Issue