mirror of https://github.com/citusdata/citus.git
Merge pull request #1785 from citusdata/real_time_xact
Make real-time executor work in transactions (and fix pg_partman)pull/1839/head
commit
906dadddb7
|
@ -821,10 +821,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
||||||
char *nodeUser = CurrentUserName();
|
char *nodeUser = CurrentUserName();
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
uint32 connectionFlags = FOR_DML | CONNECTION_PER_PLACEMENT;
|
uint32 connectionFlags = FOR_DML;
|
||||||
StringInfo copyCommand = NULL;
|
StringInfo copyCommand = NULL;
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure we use a separate connection per placement for hash-distributed
|
||||||
|
* tables in order to allow multi-shard modifications in the same transaction.
|
||||||
|
*/
|
||||||
|
if (placement->partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
connectionFlags |= CONNECTION_PER_PLACEMENT;
|
||||||
|
}
|
||||||
|
|
||||||
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
|
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
|
||||||
|
|
||||||
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
|
|
|
@ -54,6 +54,9 @@ typedef struct ConnectionReference
|
||||||
uint32 colocationGroupId;
|
uint32 colocationGroupId;
|
||||||
uint32 representativeValue;
|
uint32 representativeValue;
|
||||||
|
|
||||||
|
/* placementId of the placement, used only for append distributed tables */
|
||||||
|
uint64 placementId;
|
||||||
|
|
||||||
/* membership in MultiConnection->referencedPlacements */
|
/* membership in MultiConnection->referencedPlacements */
|
||||||
dlist_node connectionNode;
|
dlist_node connectionNode;
|
||||||
} ConnectionReference;
|
} ConnectionReference;
|
||||||
|
@ -357,6 +360,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
|
||||||
placementConnection->hadDML = false;
|
placementConnection->hadDML = false;
|
||||||
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
|
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
|
||||||
userName);
|
userName);
|
||||||
|
placementConnection->placementId = placementAccess->placement->placementId;
|
||||||
|
|
||||||
/* record association with connection */
|
/* record association with connection */
|
||||||
dlist_push_tail(&chosenConnection->referencedPlacements,
|
dlist_push_tail(&chosenConnection->referencedPlacements,
|
||||||
|
@ -785,6 +789,14 @@ ConnectionAccessedDifferentPlacement(MultiConnection *connection,
|
||||||
ConnectionReference *connectionReference =
|
ConnectionReference *connectionReference =
|
||||||
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
|
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
|
||||||
|
|
||||||
|
/* handle append and range distributed tables */
|
||||||
|
if (placement->partitionMethod != DISTRIBUTE_BY_HASH &&
|
||||||
|
placement->placementId != connectionReference->placementId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* handle hash distributed tables */
|
||||||
if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
|
if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
|
||||||
placement->colocationGroupId == connectionReference->colocationGroupId &&
|
placement->colocationGroupId == connectionReference->colocationGroupId &&
|
||||||
placement->representativeValue != connectionReference->representativeValue)
|
placement->representativeValue != connectionReference->representativeValue)
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
@ -175,6 +176,55 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MultiClientPlacementConnectStart asynchronously tries to establish a connection
|
||||||
|
* for a particular set of shard placements. If it succeeds, it returns the
|
||||||
|
* the connection id. Otherwise, it reports connection error and returns
|
||||||
|
* INVALID_CONNECTION_ID.
|
||||||
|
*/
|
||||||
|
int32
|
||||||
|
MultiClientPlacementConnectStart(List *placementAccessList, const char *userName)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
|
int32 connectionId = AllocateConnectionId();
|
||||||
|
int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */
|
||||||
|
|
||||||
|
if (connectionId == INVALID_CONNECTION_ID)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
|
||||||
|
return connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* prepare asynchronous request for worker node connection */
|
||||||
|
connection = StartPlacementListConnection(connectionFlags, placementAccessList,
|
||||||
|
userName);
|
||||||
|
|
||||||
|
ClaimConnectionExclusively(connection);
|
||||||
|
|
||||||
|
connStatusType = PQstatus(connection->pgConn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If prepared, we save the connection, and set its initial polling status
|
||||||
|
* to PGRES_POLLING_WRITING as specified in "Database Connection Control
|
||||||
|
* Functions" section of the PostgreSQL documentation.
|
||||||
|
*/
|
||||||
|
if (connStatusType != CONNECTION_BAD)
|
||||||
|
{
|
||||||
|
ClientConnectionArray[connectionId] = connection;
|
||||||
|
ClientPollingStatusArray[connectionId] = PGRES_POLLING_WRITING;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
|
||||||
|
connectionId = INVALID_CONNECTION_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
return connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* MultiClientConnectPoll returns the status of client connection. */
|
/* MultiClientConnectPoll returns the status of client connection. */
|
||||||
ConnectStatus
|
ConnectStatus
|
||||||
MultiClientConnectPoll(int32 connectionId)
|
MultiClientConnectPoll(int32 connectionId)
|
||||||
|
@ -229,6 +279,14 @@ MultiClientConnectPoll(int32 connectionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* MultiClientGetConnection returns the connection with the given ID from the pool */
|
||||||
|
MultiConnection *
|
||||||
|
MultiClientGetConnection(int32 connectionId)
|
||||||
|
{
|
||||||
|
return ClientConnectionArray[connectionId];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* MultiClientDisconnect disconnects the connection. */
|
/* MultiClientDisconnect disconnects the connection. */
|
||||||
void
|
void
|
||||||
MultiClientDisconnect(int32 connectionId)
|
MultiClientDisconnect(int32 connectionId)
|
||||||
|
@ -247,6 +305,40 @@ MultiClientDisconnect(int32 connectionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MultiClientReleaseConnection removes a connection from the client
|
||||||
|
* executor pool without disconnecting if it is run in the transaction
|
||||||
|
* otherwise it disconnects.
|
||||||
|
*
|
||||||
|
* This allows the connection to be used for other operations in the
|
||||||
|
* same transaction. The connection will still be closed at COMMIT
|
||||||
|
* or ABORT time.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
MultiClientReleaseConnection(int32 connectionId)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
const int InvalidPollingStatus = -1;
|
||||||
|
|
||||||
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
Assert(connection != NULL);
|
||||||
|
|
||||||
|
/* allow using same connection only in the same transaction */
|
||||||
|
if (!InCoordinatedTransaction())
|
||||||
|
{
|
||||||
|
MultiClientDisconnect(connectionId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UnclaimConnection(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientConnectionArray[connectionId] = NULL;
|
||||||
|
ClientPollingStatusArray[connectionId] = InvalidPollingStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MultiClientConnectionUp checks if the connection status is up, in other words,
|
* MultiClientConnectionUp checks if the connection status is up, in other words,
|
||||||
* it is not bad.
|
* it is not bad.
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "access/xact.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
@ -29,6 +30,7 @@
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_resowner.h"
|
#include "distributed/multi_resowner.h"
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
@ -88,6 +90,10 @@ MultiRealTimeExecute(Job *job)
|
||||||
|
|
||||||
workerNodeList = ActiveReadableNodeList();
|
workerNodeList = ActiveReadableNodeList();
|
||||||
workerHash = WorkerHash(workerHashName, workerNodeList);
|
workerHash = WorkerHash(workerHashName, workerNodeList);
|
||||||
|
if (IsTransactionBlock())
|
||||||
|
{
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
/* initialize task execution structures for remote execution */
|
/* initialize task execution structures for remote execution */
|
||||||
foreach(taskCell, taskList)
|
foreach(taskCell, taskList)
|
||||||
|
@ -259,8 +265,6 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
TaskExecStatus currentStatus = taskStatusArray[currentIndex];
|
TaskExecStatus currentStatus = taskStatusArray[currentIndex];
|
||||||
List *taskPlacementList = task->taskPlacementList;
|
List *taskPlacementList = task->taskPlacementList;
|
||||||
ShardPlacement *taskPlacement = list_nth(taskPlacementList, currentIndex);
|
ShardPlacement *taskPlacement = list_nth(taskPlacementList, currentIndex);
|
||||||
char *nodeName = taskPlacement->nodeName;
|
|
||||||
uint32 nodePort = taskPlacement->nodePort;
|
|
||||||
ConnectAction connectAction = CONNECT_ACTION_NONE;
|
ConnectAction connectAction = CONNECT_ACTION_NONE;
|
||||||
|
|
||||||
/* as most state transitions don't require blocking, default to not waiting */
|
/* as most state transitions don't require blocking, default to not waiting */
|
||||||
|
@ -271,12 +275,17 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
case EXEC_TASK_CONNECT_START:
|
case EXEC_TASK_CONNECT_START:
|
||||||
{
|
{
|
||||||
int32 connectionId = INVALID_CONNECTION_ID;
|
int32 connectionId = INVALID_CONNECTION_ID;
|
||||||
char *nodeDatabase = NULL;
|
List *relationShardList = task->relationShardList;
|
||||||
|
List *placementAccessList = NIL;
|
||||||
|
|
||||||
/* we use the same database name on the master and worker nodes */
|
/* create placement accesses for placements that appear in a subselect */
|
||||||
nodeDatabase = get_database_name(MyDatabaseId);
|
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
|
||||||
|
relationShardList);
|
||||||
|
|
||||||
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase,
|
/* should at least have an entry for the anchor shard */
|
||||||
|
Assert(list_length(placementAccessList) > 0);
|
||||||
|
|
||||||
|
connectionId = MultiClientPlacementConnectStart(placementAccessList,
|
||||||
NULL);
|
NULL);
|
||||||
connectionIdArray[currentIndex] = connectionId;
|
connectionIdArray[currentIndex] = connectionId;
|
||||||
|
|
||||||
|
@ -352,6 +361,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
|
|
||||||
case EXEC_TASK_FAILED:
|
case EXEC_TASK_FAILED:
|
||||||
{
|
{
|
||||||
|
bool raiseError = true;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* On task failure, we close the connection. We also reset our execution
|
* On task failure, we close the connection. We also reset our execution
|
||||||
* status assuming that we might fail on all other worker nodes and come
|
* status assuming that we might fail on all other worker nodes and come
|
||||||
|
@ -359,6 +370,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
* and compute task(s) on this node again.
|
* and compute task(s) on this node again.
|
||||||
*/
|
*/
|
||||||
int32 connectionId = connectionIdArray[currentIndex];
|
int32 connectionId = connectionIdArray[currentIndex];
|
||||||
|
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If this connection was previously marked as critical (e.g. it was used
|
||||||
|
* to perform a DDL command), then throw an error. Otherwise, mark it
|
||||||
|
* as failed and continue executing the query.
|
||||||
|
*/
|
||||||
|
MarkRemoteTransactionFailed(connection, raiseError);
|
||||||
|
|
||||||
MultiClientDisconnect(connectionId);
|
MultiClientDisconnect(connectionId);
|
||||||
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
||||||
connectAction = CONNECT_ACTION_CLOSED;
|
connectAction = CONNECT_ACTION_CLOSED;
|
||||||
|
@ -582,7 +602,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
taskStatusArray[currentIndex] = EXEC_TASK_DONE;
|
taskStatusArray[currentIndex] = EXEC_TASK_DONE;
|
||||||
|
|
||||||
/* we are done executing; we no longer need the connection */
|
/* we are done executing; we no longer need the connection */
|
||||||
MultiClientDisconnect(connectionId);
|
MultiClientReleaseConnection(connectionId);
|
||||||
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
||||||
connectAction = CONNECT_ACTION_CLOSED;
|
connectAction = CONNECT_ACTION_CLOSED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -594,6 +594,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
{
|
{
|
||||||
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
|
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
|
||||||
relationShardList);
|
relationShardList);
|
||||||
|
|
||||||
|
Assert(list_length(placementAccessList) == list_length(relationShardList));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -634,7 +636,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
/*
|
/*
|
||||||
* BuildPlacementSelectList builds a list of SELECT placement accesses
|
* BuildPlacementSelectList builds a list of SELECT placement accesses
|
||||||
* which can be used to call StartPlacementListConnection or
|
* which can be used to call StartPlacementListConnection or
|
||||||
* GetPlacementListConnection.
|
* GetPlacementListConnection. If the node group does not have a placement
|
||||||
|
* (e.g. in case of a broadcast join) then the shard is skipped.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
BuildPlacementSelectList(uint32 groupId, List *relationShardList)
|
BuildPlacementSelectList(uint32 groupId, List *relationShardList)
|
||||||
|
@ -651,8 +654,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList)
|
||||||
placement = FindShardPlacementOnGroup(groupId, relationShard->shardId);
|
placement = FindShardPlacementOnGroup(groupId, relationShard->shardId);
|
||||||
if (placement == NULL)
|
if (placement == NULL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("no active placement of shard %ld found on group %d",
|
continue;
|
||||||
relationShard->shardId, groupId)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
|
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
|
||||||
|
@ -887,6 +889,8 @@ GetModifyConnections(Task *task, bool markCritical)
|
||||||
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
|
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
|
||||||
relationShardList);
|
relationShardList);
|
||||||
|
|
||||||
|
Assert(list_length(placementAccessList) == list_length(relationShardList));
|
||||||
|
|
||||||
/* create placement access for the placement that we're modifying */
|
/* create placement access for the placement that we're modifying */
|
||||||
placementModification = CreatePlacementAccess(taskPlacement,
|
placementModification = CreatePlacementAccess(taskPlacement,
|
||||||
PLACEMENT_ACCESS_DML);
|
PLACEMENT_ACCESS_DML);
|
||||||
|
|
|
@ -160,6 +160,7 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment
|
||||||
static StringInfo NodeNameArrayString(List *workerNodeList);
|
static StringInfo NodeNameArrayString(List *workerNodeList);
|
||||||
static StringInfo NodePortArrayString(List *workerNodeList);
|
static StringInfo NodePortArrayString(List *workerNodeList);
|
||||||
static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId);
|
static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId);
|
||||||
|
static List * BuildRelationShardList(List *rangeTableList, List *fragmentList);
|
||||||
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
|
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
|
||||||
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
|
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
|
||||||
RangeTableFragment *fragment);
|
RangeTableFragment *fragment);
|
||||||
|
@ -2529,6 +2530,8 @@ SqlTaskList(Job *job)
|
||||||
|
|
||||||
sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK, sqlQueryString->data);
|
sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK, sqlQueryString->data);
|
||||||
sqlTask->dependedTaskList = dataFetchTaskList;
|
sqlTask->dependedTaskList = dataFetchTaskList;
|
||||||
|
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
|
||||||
|
fragmentCombination);
|
||||||
|
|
||||||
/* log the query string we generated */
|
/* log the query string we generated */
|
||||||
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
|
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
|
||||||
|
@ -3964,6 +3967,40 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BuildRelationShardList builds a list of RelationShard pairs for a task.
|
||||||
|
* This represents the mapping of range table entries to shard IDs for a
|
||||||
|
* task for the purposes of locking, deparsing, and connection management.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
BuildRelationShardList(List *rangeTableList, List *fragmentList)
|
||||||
|
{
|
||||||
|
List *relationShardList = NIL;
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
|
||||||
|
foreach(fragmentCell, fragmentList)
|
||||||
|
{
|
||||||
|
RangeTableFragment *fragment = (RangeTableFragment *) lfirst(fragmentCell);
|
||||||
|
Index rangeTableId = fragment->rangeTableId;
|
||||||
|
RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableId, rangeTableList);
|
||||||
|
|
||||||
|
CitusRTEKind fragmentType = fragment->fragmentType;
|
||||||
|
if (fragmentType == CITUS_RTE_RELATION)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = (ShardInterval *) fragment->fragmentReference;
|
||||||
|
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
||||||
|
|
||||||
|
relationShard->relationId = rangeTableEntry->relid;
|
||||||
|
relationShard->shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
relationShardList = lappend(relationShardList, relationShard);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return relationShardList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateRangeTableAlias walks over each fragment in the given fragment list,
|
* UpdateRangeTableAlias walks over each fragment in the given fragment list,
|
||||||
* and creates an alias that represents the fragment name to be used in the
|
* and creates an alias that represents the fragment name to be used in the
|
||||||
|
|
|
@ -14,6 +14,11 @@
|
||||||
#ifndef MULTI_CLIENT_EXECUTOR_H
|
#ifndef MULTI_CLIENT_EXECUTOR_H
|
||||||
#define MULTI_CLIENT_EXECUTOR_H
|
#define MULTI_CLIENT_EXECUTOR_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
|
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
|
||||||
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
|
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
|
||||||
#define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */
|
#define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */
|
||||||
|
@ -99,8 +104,12 @@ extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
|
||||||
const char *nodeDatabase, const char *nodeUser);
|
const char *nodeDatabase, const char *nodeUser);
|
||||||
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
|
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
|
||||||
const char *nodeDatabase, const char *nodeUser);
|
const char *nodeDatabase, const char *nodeUser);
|
||||||
|
extern int32 MultiClientPlacementConnectStart(List *placementAccessList,
|
||||||
|
const char *userName);
|
||||||
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
|
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
|
||||||
|
extern MultiConnection * MultiClientGetConnection(int32 connectionId);
|
||||||
extern void MultiClientDisconnect(int32 connectionId);
|
extern void MultiClientDisconnect(int32 connectionId);
|
||||||
|
extern void MultiClientReleaseConnection(int32 connectionId);
|
||||||
extern bool MultiClientConnectionUp(int32 connectionId);
|
extern bool MultiClientConnectionUp(int32 connectionId);
|
||||||
extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
|
extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
|
||||||
int *rowCount, int *columnCount);
|
int *rowCount, int *columnCount);
|
||||||
|
|
|
@ -2132,13 +2132,21 @@ RETURNING *;
|
||||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
|
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported
|
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported
|
||||||
TRUNCATE raw_events_first;
|
TRUNCATE raw_events_first;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO raw_events_first (user_id, value_1)
|
INSERT INTO raw_events_first (user_id, value_1)
|
||||||
SELECT s, s FROM generate_series(1, 5) s;
|
SELECT s, s FROM generate_series(1, 5) s;
|
||||||
SELECT user_id, value_1 FROM raw_events_first;
|
SELECT user_id, value_1 FROM raw_events_first;
|
||||||
ERROR: cannot open new connections after the first modification command within a transaction
|
user_id | value_1
|
||||||
|
---------+---------
|
||||||
|
1 | 1
|
||||||
|
5 | 5
|
||||||
|
3 | 3
|
||||||
|
4 | 4
|
||||||
|
2 | 2
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported
|
-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported
|
||||||
TRUNCATE raw_events_first;
|
TRUNCATE raw_events_first;
|
||||||
|
|
|
@ -0,0 +1,233 @@
|
||||||
|
SET citus.next_shard_id TO 1610000;
|
||||||
|
CREATE SCHEMA multi_real_time_transaction;
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
CREATE TABLE test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_distributed_table('test_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
CREATE TABLE co_test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_distributed_table('co_test_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY co_test_table FROM stdin delimiter ',';
|
||||||
|
CREATE TABLE ref_test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_reference_table('ref_test_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY ref_test_table FROM stdin delimiter ',';
|
||||||
|
-- Test with select and router insert
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table VALUES(7,8,'gg');
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with select and multi-row insert
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii');
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
9
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with INSERT .. SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table SELECT * FROM co_test_table;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
12
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with COPY
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
9
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with router update
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
27
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE test_table SET col_1 = 0 WHERE id = 2;
|
||||||
|
DELETE FROM test_table WHERE id = 3;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
20
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with multi-shard update
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
27
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE test_table SET col_1 = 5;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
30
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with subqueries
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
27
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE
|
||||||
|
test_table
|
||||||
|
SET
|
||||||
|
col_1 = 4
|
||||||
|
WHERE
|
||||||
|
test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1)
|
||||||
|
AND test_table.id = 1;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
29
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with partitioned table
|
||||||
|
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
-- create its partitions
|
||||||
|
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||||
|
-- load some data and distribute tables
|
||||||
|
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||||
|
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||||
|
SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||||
|
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||||
|
SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
DROP TABLE partitioning_test;
|
||||||
|
-- Test with create-drop table
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE test_table_inn(id int, num_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_inn','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7);
|
||||||
|
SELECT COUNT(*) FROM test_table_inn;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test_table_inn;
|
||||||
|
COMMIT;
|
||||||
|
-- Test with utility functions
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX tt_ind_1 ON test_table(col_1);
|
||||||
|
ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with foreign key
|
||||||
|
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
|
||||||
|
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM test_table where id = 1 or id = 3;
|
||||||
|
SELECT * FROM co_test_table;
|
||||||
|
id | col_1 | col_2
|
||||||
|
----+-------+--------
|
||||||
|
2 | 30 | 'bb10'
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
DETAIL: drop cascades to table test_table
|
||||||
|
drop cascades to table co_test_table
|
||||||
|
drop cascades to table ref_test_table
|
|
@ -0,0 +1,241 @@
|
||||||
|
SET citus.next_shard_id TO 1610000;
|
||||||
|
CREATE SCHEMA multi_real_time_transaction;
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
CREATE TABLE test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_distributed_table('test_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
CREATE TABLE co_test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_distributed_table('co_test_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY co_test_table FROM stdin delimiter ',';
|
||||||
|
CREATE TABLE ref_test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_reference_table('ref_test_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY ref_test_table FROM stdin delimiter ',';
|
||||||
|
-- Test with select and router insert
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table VALUES(7,8,'gg');
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with select and multi-row insert
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii');
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
9
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with INSERT .. SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table SELECT * FROM co_test_table;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
12
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with COPY
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
9
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with router update
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
27
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE test_table SET col_1 = 0 WHERE id = 2;
|
||||||
|
DELETE FROM test_table WHERE id = 3;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
20
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with multi-shard update
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
27
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE test_table SET col_1 = 5;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
30
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with subqueries
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
27
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE
|
||||||
|
test_table
|
||||||
|
SET
|
||||||
|
col_1 = 4
|
||||||
|
WHERE
|
||||||
|
test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1)
|
||||||
|
AND test_table.id = 1;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
sum
|
||||||
|
-----
|
||||||
|
29
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with partitioned table
|
||||||
|
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
|
||||||
|
^
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
-- create its partitions
|
||||||
|
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
|
||||||
|
^
|
||||||
|
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin...
|
||||||
|
^
|
||||||
|
-- load some data and distribute tables
|
||||||
|
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||||
|
ERROR: relation "partitioning_test" does not exist
|
||||||
|
LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||||
|
^
|
||||||
|
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||||
|
ERROR: relation "partitioning_test" does not exist
|
||||||
|
LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||||
|
^
|
||||||
|
SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
ERROR: relation "partitioning_test" does not exist
|
||||||
|
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
^
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
ERROR: relation "partitioning_test" does not exist
|
||||||
|
LINE 1: SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
^
|
||||||
|
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
DROP TABLE partitioning_test;
|
||||||
|
ERROR: table "partitioning_test" does not exist
|
||||||
|
-- Test with create-drop table
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE test_table_inn(id int, num_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_inn','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7);
|
||||||
|
SELECT COUNT(*) FROM test_table_inn;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test_table_inn;
|
||||||
|
COMMIT;
|
||||||
|
-- Test with utility functions
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX tt_ind_1 ON test_table(col_1);
|
||||||
|
ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test with foreign key
|
||||||
|
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
|
||||||
|
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM test_table where id = 1 or id = 3;
|
||||||
|
SELECT * FROM co_test_table;
|
||||||
|
id | col_1 | col_2
|
||||||
|
----+-------+--------
|
||||||
|
2 | 30 | 'bb10'
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
DETAIL: drop cascades to table test_table
|
||||||
|
drop cascades to table co_test_table
|
||||||
|
drop cascades to table ref_test_table
|
|
@ -315,13 +315,13 @@ DROP FUNCTION log_ddl_tag();
|
||||||
DROP TABLE ddl_commands;
|
DROP TABLE ddl_commands;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- Distributed SELECTs cannot appear after ALTER
|
-- Distributed SELECTs may appear after ALTER
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||||
SELECT count(*) FROM lineitem_alter;
|
SELECT count(*) FROM lineitem_alter;
|
||||||
COMMIT;
|
ROLLBACK;
|
||||||
|
|
||||||
-- but are allowed before
|
-- and before
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT count(*) FROM lineitem_alter;
|
SELECT count(*) FROM lineitem_alter;
|
||||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||||
|
|
|
@ -32,6 +32,7 @@ test: multi_read_from_secondaries
|
||||||
test: multi_create_table
|
test: multi_create_table
|
||||||
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
|
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
|
||||||
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_insert_select_window multi_shard_update_delete
|
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_insert_select_window multi_shard_update_delete
|
||||||
|
test: multi_real_time_transaction
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# Tests for partitioning support
|
# Tests for partitioning support
|
||||||
|
|
|
@ -673,13 +673,17 @@ DROP EVENT TRIGGER log_ddl_tag;
|
||||||
DROP FUNCTION log_ddl_tag();
|
DROP FUNCTION log_ddl_tag();
|
||||||
DROP TABLE ddl_commands;
|
DROP TABLE ddl_commands;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- Distributed SELECTs cannot appear after ALTER
|
-- Distributed SELECTs may appear after ALTER
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||||
SELECT count(*) FROM lineitem_alter;
|
SELECT count(*) FROM lineitem_alter;
|
||||||
ERROR: cannot open new connections after the first modification command within a transaction
|
count
|
||||||
COMMIT;
|
-------
|
||||||
-- but are allowed before
|
18000
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- and before
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT count(*) FROM lineitem_alter;
|
SELECT count(*) FROM lineitem_alter;
|
||||||
count
|
count
|
||||||
|
|
|
@ -1716,7 +1716,7 @@ RETURNING *;
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported
|
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported
|
||||||
TRUNCATE raw_events_first;
|
TRUNCATE raw_events_first;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
SET citus.next_shard_id TO 1610000;
|
||||||
|
|
||||||
|
CREATE SCHEMA multi_real_time_transaction;
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
|
||||||
|
CREATE TABLE test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_distributed_table('test_table','id');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2,'aa'
|
||||||
|
2,3,'bb'
|
||||||
|
3,4,'cc'
|
||||||
|
4,5,'dd'
|
||||||
|
5,6,'ee'
|
||||||
|
6,7,'ff'
|
||||||
|
\.
|
||||||
|
|
||||||
|
CREATE TABLE co_test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_distributed_table('co_test_table','id');
|
||||||
|
\COPY co_test_table FROM stdin delimiter ',';
|
||||||
|
1,20,'aa10'
|
||||||
|
2,30,'bb10'
|
||||||
|
3,40,'cc10'
|
||||||
|
3,4,'cc1'
|
||||||
|
3,5,'cc2'
|
||||||
|
1,2,'cc2'
|
||||||
|
\.
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE ref_test_table(id int, col_1 int, col_2 text);
|
||||||
|
SELECT create_reference_table('ref_test_table');
|
||||||
|
\COPY ref_test_table FROM stdin delimiter ',';
|
||||||
|
1,2,'rr1'
|
||||||
|
2,3,'rr2'
|
||||||
|
3,4,'rr3'
|
||||||
|
4,5,'rr4'
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Test with select and router insert
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
INSERT INTO test_table VALUES(7,8,'gg');
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with select and multi-row insert
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii');
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with INSERT .. SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
INSERT INTO test_table SELECT * FROM co_test_table;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with COPY
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
8,9,'gg'
|
||||||
|
9,10,'hh'
|
||||||
|
10,11,'ii'
|
||||||
|
\.
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with router update
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
UPDATE test_table SET col_1 = 0 WHERE id = 2;
|
||||||
|
DELETE FROM test_table WHERE id = 3;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with multi-shard update
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
UPDATE test_table SET col_1 = 5;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with subqueries
|
||||||
|
BEGIN;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
UPDATE
|
||||||
|
test_table
|
||||||
|
SET
|
||||||
|
col_1 = 4
|
||||||
|
WHERE
|
||||||
|
test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1)
|
||||||
|
AND test_table.id = 1;
|
||||||
|
SELECT SUM(col_1) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with partitioned table
|
||||||
|
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
-- create its partitions
|
||||||
|
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||||
|
|
||||||
|
-- load some data and distribute tables
|
||||||
|
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||||
|
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||||
|
SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||||
|
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||||
|
SELECT COUNT(*) FROM partitioning_test;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
DROP TABLE partitioning_test;
|
||||||
|
|
||||||
|
-- Test with create-drop table
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE test_table_inn(id int, num_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_inn','id');
|
||||||
|
INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7);
|
||||||
|
SELECT COUNT(*) FROM test_table_inn;
|
||||||
|
DROP TABLE test_table_inn;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- Test with utility functions
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
CREATE INDEX tt_ind_1 ON test_table(col_1);
|
||||||
|
ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test with foreign key
|
||||||
|
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
|
||||||
|
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM test_table where id = 1 or id = 3;
|
||||||
|
SELECT * FROM co_test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
DROP SCHEMA multi_real_time_transaction CASCADE;
|
Loading…
Reference in New Issue