Make real time executor work in transactions

pull/1785/head
Marco Slot 2017-11-10 16:06:13 +01:00 committed by velioglu
parent 73cadbecd6
commit a9933deac6
15 changed files with 839 additions and 22 deletions

View File

@ -821,10 +821,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeUser = CurrentUserName();
MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DML | CONNECTION_PER_PLACEMENT;
uint32 connectionFlags = FOR_DML;
StringInfo copyCommand = NULL;
PGresult *result = NULL;
/*
* Make sure we use a separate connection per placement for hash-distributed
* tables in order to allow multi-shard modifications in the same transaction.
*/
if (placement->partitionMethod == DISTRIBUTE_BY_HASH)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
}
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
if (PQstatus(connection->pgConn) != CONNECTION_OK)

View File

@ -54,6 +54,9 @@ typedef struct ConnectionReference
uint32 colocationGroupId;
uint32 representativeValue;
/* placementId of the placement, used only for append distributed tables */
uint64 placementId;
/* membership in MultiConnection->referencedPlacements */
dlist_node connectionNode;
} ConnectionReference;
@ -357,6 +360,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
placementConnection->hadDML = false;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
placementConnection->placementId = placementAccess->placement->placementId;
/* record association with connection */
dlist_push_tail(&chosenConnection->referencedPlacements,
@ -785,6 +789,14 @@ ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
/* handle append and range distributed tables */
if (placement->partitionMethod != DISTRIBUTE_BY_HASH &&
placement->placementId != connectionReference->placementId)
{
return true;
}
/* handle hash distributed tables */
if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
placement->colocationGroupId == connectionReference->colocationGroupId &&
placement->representativeValue != connectionReference->representativeValue)

View File

@ -22,6 +22,7 @@
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include <errno.h>
@ -175,6 +176,55 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
}
/*
* MultiClientPlacementConnectStart asynchronously tries to establish a connection
* for a particular set of shard placements. If it succeeds, it returns the
* the connection id. Otherwise, it reports connection error and returns
* INVALID_CONNECTION_ID.
*/
int32
MultiClientPlacementConnectStart(List *placementAccessList, const char *userName)
{
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
int32 connectionId = AllocateConnectionId();
int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
return connectionId;
}
/* prepare asynchronous request for worker node connection */
connection = StartPlacementListConnection(connectionFlags, placementAccessList,
userName);
ClaimConnectionExclusively(connection);
connStatusType = PQstatus(connection->pgConn);
/*
* If prepared, we save the connection, and set its initial polling status
* to PGRES_POLLING_WRITING as specified in "Database Connection Control
* Functions" section of the PostgreSQL documentation.
*/
if (connStatusType != CONNECTION_BAD)
{
ClientConnectionArray[connectionId] = connection;
ClientPollingStatusArray[connectionId] = PGRES_POLLING_WRITING;
}
else
{
ReportConnectionError(connection, WARNING);
connectionId = INVALID_CONNECTION_ID;
}
return connectionId;
}
/* MultiClientConnectPoll returns the status of client connection. */
ConnectStatus
MultiClientConnectPoll(int32 connectionId)
@ -229,6 +279,14 @@ MultiClientConnectPoll(int32 connectionId)
}
/* MultiClientGetConnection returns the connection with the given ID from the pool */
MultiConnection *
MultiClientGetConnection(int32 connectionId)
{
return ClientConnectionArray[connectionId];
}
/* MultiClientDisconnect disconnects the connection. */
void
MultiClientDisconnect(int32 connectionId)
@ -247,6 +305,40 @@ MultiClientDisconnect(int32 connectionId)
}
/*
* MultiClientReleaseConnection removes a connection from the client
* executor pool without disconnecting if it is run in the transaction
* otherwise it disconnects.
*
* This allows the connection to be used for other operations in the
* same transaction. The connection will still be closed at COMMIT
* or ABORT time.
*/
void
MultiClientReleaseConnection(int32 connectionId)
{
MultiConnection *connection = NULL;
const int InvalidPollingStatus = -1;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
/* allow using same connection only in the same transaction */
if (!InCoordinatedTransaction())
{
MultiClientDisconnect(connectionId);
}
else
{
UnclaimConnection(connection);
}
ClientConnectionArray[connectionId] = NULL;
ClientPollingStatusArray[connectionId] = InvalidPollingStatus;
}
/*
* MultiClientConnectionUp checks if the connection status is up, in other words,
* it is not bad.

View File

@ -22,6 +22,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/xact.h"
#include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h"
@ -29,6 +30,7 @@
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_protocol.h"
@ -88,6 +90,10 @@ MultiRealTimeExecute(Job *job)
workerNodeList = ActiveReadableNodeList();
workerHash = WorkerHash(workerHashName, workerNodeList);
if (IsTransactionBlock())
{
BeginOrContinueCoordinatedTransaction();
}
/* initialize task execution structures for remote execution */
foreach(taskCell, taskList)
@ -259,8 +265,6 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecStatus currentStatus = taskStatusArray[currentIndex];
List *taskPlacementList = task->taskPlacementList;
ShardPlacement *taskPlacement = list_nth(taskPlacementList, currentIndex);
char *nodeName = taskPlacement->nodeName;
uint32 nodePort = taskPlacement->nodePort;
ConnectAction connectAction = CONNECT_ACTION_NONE;
/* as most state transitions don't require blocking, default to not waiting */
@ -271,12 +275,17 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
case EXEC_TASK_CONNECT_START:
{
int32 connectionId = INVALID_CONNECTION_ID;
char *nodeDatabase = NULL;
List *relationShardList = task->relationShardList;
List *placementAccessList = NIL;
/* we use the same database name on the master and worker nodes */
nodeDatabase = get_database_name(MyDatabaseId);
/* create placement accesses for placements that appear in a subselect */
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase,
/* should at least have an entry for the anchor shard */
Assert(list_length(placementAccessList) > 0);
connectionId = MultiClientPlacementConnectStart(placementAccessList,
NULL);
connectionIdArray[currentIndex] = connectionId;
@ -352,6 +361,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
case EXEC_TASK_FAILED:
{
bool raiseError = true;
/*
* On task failure, we close the connection. We also reset our execution
* status assuming that we might fail on all other worker nodes and come
@ -359,6 +370,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
* and compute task(s) on this node again.
*/
int32 connectionId = connectionIdArray[currentIndex];
MultiConnection *connection = MultiClientGetConnection(connectionId);
/*
* If this connection was previously marked as critical (e.g. it was used
* to perform a DDL command), then throw an error. Otherwise, mark it
* as failed and continue executing the query.
*/
MarkRemoteTransactionFailed(connection, raiseError);
MultiClientDisconnect(connectionId);
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
connectAction = CONNECT_ACTION_CLOSED;
@ -582,7 +602,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
taskStatusArray[currentIndex] = EXEC_TASK_DONE;
/* we are done executing; we no longer need the connection */
MultiClientDisconnect(connectionId);
MultiClientReleaseConnection(connectionId);
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
connectAction = CONNECT_ACTION_CLOSED;
}

View File

@ -594,6 +594,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
{
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList);
Assert(list_length(placementAccessList) == list_length(relationShardList));
}
else
{
@ -634,7 +636,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
/*
* BuildPlacementSelectList builds a list of SELECT placement accesses
* which can be used to call StartPlacementListConnection or
* GetPlacementListConnection.
* GetPlacementListConnection. If the node group does not have a placement
* (e.g. in case of a broadcast join) then the shard is skipped.
*/
List *
BuildPlacementSelectList(uint32 groupId, List *relationShardList)
@ -651,8 +654,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList)
placement = FindShardPlacementOnGroup(groupId, relationShard->shardId);
if (placement == NULL)
{
ereport(ERROR, (errmsg("no active placement of shard %ld found on group %d",
relationShard->shardId, groupId)));
continue;
}
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
@ -887,6 +889,8 @@ GetModifyConnections(Task *task, bool markCritical)
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList);
Assert(list_length(placementAccessList) == list_length(relationShardList));
/* create placement access for the placement that we're modifying */
placementModification = CreatePlacementAccess(taskPlacement,
PLACEMENT_ACCESS_DML);

View File

@ -160,6 +160,7 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment
static StringInfo NodeNameArrayString(List *workerNodeList);
static StringInfo NodePortArrayString(List *workerNodeList);
static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId);
static List * BuildRelationShardList(List *rangeTableList, List *fragmentList);
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
RangeTableFragment *fragment);
@ -2529,6 +2530,8 @@ SqlTaskList(Job *job)
sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK, sqlQueryString->data);
sqlTask->dependedTaskList = dataFetchTaskList;
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
fragmentCombination);
/* log the query string we generated */
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
@ -3964,6 +3967,40 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin
}
/*
* BuildRelationShardList builds a list of RelationShard pairs for a task.
* This represents the mapping of range table entries to shard IDs for a
* task for the purposes of locking, deparsing, and connection management.
*/
static List *
BuildRelationShardList(List *rangeTableList, List *fragmentList)
{
List *relationShardList = NIL;
ListCell *fragmentCell = NULL;
foreach(fragmentCell, fragmentList)
{
RangeTableFragment *fragment = (RangeTableFragment *) lfirst(fragmentCell);
Index rangeTableId = fragment->rangeTableId;
RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableId, rangeTableList);
CitusRTEKind fragmentType = fragment->fragmentType;
if (fragmentType == CITUS_RTE_RELATION)
{
ShardInterval *shardInterval = (ShardInterval *) fragment->fragmentReference;
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = rangeTableEntry->relid;
relationShard->shardId = shardInterval->shardId;
relationShardList = lappend(relationShardList, relationShard);
}
}
return relationShardList;
}
/*
* UpdateRangeTableAlias walks over each fragment in the given fragment list,
* and creates an alias that represents the fragment name to be used in the

View File

@ -14,6 +14,11 @@
#ifndef MULTI_CLIENT_EXECUTOR_H
#define MULTI_CLIENT_EXECUTOR_H
#include "distributed/connection_management.h"
#include "nodes/pg_list.h"
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
#define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */
@ -99,8 +104,12 @@ extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientPlacementConnectStart(List *placementAccessList,
const char *userName);
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
extern MultiConnection * MultiClientGetConnection(int32 connectionId);
extern void MultiClientDisconnect(int32 connectionId);
extern void MultiClientReleaseConnection(int32 connectionId);
extern bool MultiClientConnectionUp(int32 connectionId);
extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
int *rowCount, int *columnCount);

View File

@ -2132,13 +2132,21 @@ RETURNING *;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
RESET client_min_messages;
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported
TRUNCATE raw_events_first;
BEGIN;
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, s FROM generate_series(1, 5) s;
SELECT user_id, value_1 FROM raw_events_first;
ERROR: cannot open new connections after the first modification command within a transaction
user_id | value_1
---------+---------
1 | 1
5 | 5
3 | 3
4 | 4
2 | 2
(5 rows)
ROLLBACK;
-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported
TRUNCATE raw_events_first;

View File

@ -0,0 +1,233 @@
SET citus.next_shard_id TO 1610000;
CREATE SCHEMA multi_real_time_transaction;
SET search_path = 'multi_real_time_transaction';
SET citus.shard_replication_factor to 1;
CREATE TABLE test_table(id int, col_1 int, col_2 text);
SELECT create_distributed_table('test_table','id');
create_distributed_table
--------------------------
(1 row)
\COPY test_table FROM stdin delimiter ',';
CREATE TABLE co_test_table(id int, col_1 int, col_2 text);
SELECT create_distributed_table('co_test_table','id');
create_distributed_table
--------------------------
(1 row)
\COPY co_test_table FROM stdin delimiter ',';
CREATE TABLE ref_test_table(id int, col_1 int, col_2 text);
SELECT create_reference_table('ref_test_table');
create_reference_table
------------------------
(1 row)
\COPY ref_test_table FROM stdin delimiter ',';
-- Test with select and router insert
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
INSERT INTO test_table VALUES(7,8,'gg');
SELECT COUNT(*) FROM test_table;
count
-------
7
(1 row)
ROLLBACK;
-- Test with select and multi-row insert
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii');
SELECT COUNT(*) FROM test_table;
count
-------
9
(1 row)
ROLLBACK;
-- Test with INSERT .. SELECT
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
INSERT INTO test_table SELECT * FROM co_test_table;
SELECT COUNT(*) FROM test_table;
count
-------
12
(1 row)
ROLLBACK;
-- Test with COPY
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
\COPY test_table FROM stdin delimiter ',';
SELECT COUNT(*) FROM test_table;
count
-------
9
(1 row)
ROLLBACK;
-- Test with router update
BEGIN;
SELECT SUM(col_1) FROM test_table;
sum
-----
27
(1 row)
UPDATE test_table SET col_1 = 0 WHERE id = 2;
DELETE FROM test_table WHERE id = 3;
SELECT SUM(col_1) FROM test_table;
sum
-----
20
(1 row)
ROLLBACK;
-- Test with multi-shard update
BEGIN;
SELECT SUM(col_1) FROM test_table;
sum
-----
27
(1 row)
UPDATE test_table SET col_1 = 5;
SELECT SUM(col_1) FROM test_table;
sum
-----
30
(1 row)
ROLLBACK;
-- Test with subqueries
BEGIN;
SELECT SUM(col_1) FROM test_table;
sum
-----
27
(1 row)
UPDATE
test_table
SET
col_1 = 4
WHERE
test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1)
AND test_table.id = 1;
SELECT SUM(col_1) FROM test_table;
sum
-----
29
(1 row)
ROLLBACK;
-- Test with partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
SET citus.shard_replication_factor TO 1;
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
SELECT create_distributed_table('partitioning_test', 'id');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
BEGIN;
SELECT COUNT(*) FROM partitioning_test;
count
-------
2
(1 row)
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
SELECT COUNT(*) FROM partitioning_test;
count
-------
4
(1 row)
COMMIT;
DROP TABLE partitioning_test;
-- Test with create-drop table
BEGIN;
CREATE TABLE test_table_inn(id int, num_1 int);
SELECT create_distributed_table('test_table_inn','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7);
SELECT COUNT(*) FROM test_table_inn;
count
-------
3
(1 row)
DROP TABLE test_table_inn;
COMMIT;
-- Test with utility functions
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
CREATE INDEX tt_ind_1 ON test_table(col_1);
ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
ROLLBACK;
-- Test with foreign key
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
BEGIN;
DELETE FROM test_table where id = 1 or id = 3;
SELECT * FROM co_test_table;
id | col_1 | col_2
----+-------+--------
2 | 30 | 'bb10'
(1 row)
ROLLBACK;
DROP SCHEMA multi_real_time_transaction CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table test_table
drop cascades to table co_test_table
drop cascades to table ref_test_table

View File

@ -0,0 +1,241 @@
SET citus.next_shard_id TO 1610000;
CREATE SCHEMA multi_real_time_transaction;
SET search_path = 'multi_real_time_transaction';
SET citus.shard_replication_factor to 1;
CREATE TABLE test_table(id int, col_1 int, col_2 text);
SELECT create_distributed_table('test_table','id');
create_distributed_table
--------------------------
(1 row)
\COPY test_table FROM stdin delimiter ',';
CREATE TABLE co_test_table(id int, col_1 int, col_2 text);
SELECT create_distributed_table('co_test_table','id');
create_distributed_table
--------------------------
(1 row)
\COPY co_test_table FROM stdin delimiter ',';
CREATE TABLE ref_test_table(id int, col_1 int, col_2 text);
SELECT create_reference_table('ref_test_table');
create_reference_table
------------------------
(1 row)
\COPY ref_test_table FROM stdin delimiter ',';
-- Test with select and router insert
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
INSERT INTO test_table VALUES(7,8,'gg');
SELECT COUNT(*) FROM test_table;
count
-------
7
(1 row)
ROLLBACK;
-- Test with select and multi-row insert
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii');
SELECT COUNT(*) FROM test_table;
count
-------
9
(1 row)
ROLLBACK;
-- Test with INSERT .. SELECT
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
INSERT INTO test_table SELECT * FROM co_test_table;
SELECT COUNT(*) FROM test_table;
count
-------
12
(1 row)
ROLLBACK;
-- Test with COPY
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
\COPY test_table FROM stdin delimiter ',';
SELECT COUNT(*) FROM test_table;
count
-------
9
(1 row)
ROLLBACK;
-- Test with router update
BEGIN;
SELECT SUM(col_1) FROM test_table;
sum
-----
27
(1 row)
UPDATE test_table SET col_1 = 0 WHERE id = 2;
DELETE FROM test_table WHERE id = 3;
SELECT SUM(col_1) FROM test_table;
sum
-----
20
(1 row)
ROLLBACK;
-- Test with multi-shard update
BEGIN;
SELECT SUM(col_1) FROM test_table;
sum
-----
27
(1 row)
UPDATE test_table SET col_1 = 5;
SELECT SUM(col_1) FROM test_table;
sum
-----
30
(1 row)
ROLLBACK;
-- Test with subqueries
BEGIN;
SELECT SUM(col_1) FROM test_table;
sum
-----
27
(1 row)
UPDATE
test_table
SET
col_1 = 4
WHERE
test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1)
AND test_table.id = 1;
SELECT SUM(col_1) FROM test_table;
sum
-----
29
(1 row)
ROLLBACK;
-- Test with partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
^
SET citus.shard_replication_factor TO 1;
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
^
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin...
^
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06');
^
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07');
^
SELECT create_distributed_table('partitioning_test', 'id');
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
^
BEGIN;
SELECT COUNT(*) FROM partitioning_test;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT COUNT(*) FROM partitioning_test;
^
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
ERROR: current transaction is aborted, commands ignored until end of transaction block
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
ERROR: current transaction is aborted, commands ignored until end of transaction block
SELECT COUNT(*) FROM partitioning_test;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
DROP TABLE partitioning_test;
ERROR: table "partitioning_test" does not exist
-- Test with create-drop table
BEGIN;
CREATE TABLE test_table_inn(id int, num_1 int);
SELECT create_distributed_table('test_table_inn','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7);
SELECT COUNT(*) FROM test_table_inn;
count
-------
3
(1 row)
DROP TABLE test_table_inn;
COMMIT;
-- Test with utility functions
BEGIN;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
CREATE INDEX tt_ind_1 ON test_table(col_1);
ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
ROLLBACK;
-- Test with foreign key
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
BEGIN;
DELETE FROM test_table where id = 1 or id = 3;
SELECT * FROM co_test_table;
id | col_1 | col_2
----+-------+--------
2 | 30 | 'bb10'
(1 row)
ROLLBACK;
DROP SCHEMA multi_real_time_transaction CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table test_table
drop cascades to table co_test_table
drop cascades to table ref_test_table

View File

@ -315,13 +315,13 @@ DROP FUNCTION log_ddl_tag();
DROP TABLE ddl_commands;
\c - - - :master_port
-- Distributed SELECTs cannot appear after ALTER
-- Distributed SELECTs may appear after ALTER
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SELECT count(*) FROM lineitem_alter;
COMMIT;
ROLLBACK;
-- but are allowed before
-- and before
BEGIN;
SELECT count(*) FROM lineitem_alter;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);

View File

@ -32,6 +32,7 @@ test: multi_read_from_secondaries
test: multi_create_table
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_insert_select_window multi_shard_update_delete
test: multi_real_time_transaction
# ----------
# Tests for partitioning support

View File

@ -673,13 +673,17 @@ DROP EVENT TRIGGER log_ddl_tag;
DROP FUNCTION log_ddl_tag();
DROP TABLE ddl_commands;
\c - - - :master_port
-- Distributed SELECTs cannot appear after ALTER
-- Distributed SELECTs may appear after ALTER
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SELECT count(*) FROM lineitem_alter;
ERROR: cannot open new connections after the first modification command within a transaction
COMMIT;
-- but are allowed before
count
-------
18000
(1 row)
ROLLBACK;
-- and before
BEGIN;
SELECT count(*) FROM lineitem_alter;
count

View File

@ -1716,7 +1716,7 @@ RETURNING *;
RESET client_min_messages;
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported
TRUNCATE raw_events_first;
BEGIN;

View File

@ -0,0 +1,147 @@
SET citus.next_shard_id TO 1610000;
CREATE SCHEMA multi_real_time_transaction;
SET search_path = 'multi_real_time_transaction';
SET citus.shard_replication_factor to 1;
CREATE TABLE test_table(id int, col_1 int, col_2 text);
SELECT create_distributed_table('test_table','id');
\COPY test_table FROM stdin delimiter ',';
1,2,'aa'
2,3,'bb'
3,4,'cc'
4,5,'dd'
5,6,'ee'
6,7,'ff'
\.
CREATE TABLE co_test_table(id int, col_1 int, col_2 text);
SELECT create_distributed_table('co_test_table','id');
\COPY co_test_table FROM stdin delimiter ',';
1,20,'aa10'
2,30,'bb10'
3,40,'cc10'
3,4,'cc1'
3,5,'cc2'
1,2,'cc2'
\.
CREATE TABLE ref_test_table(id int, col_1 int, col_2 text);
SELECT create_reference_table('ref_test_table');
\COPY ref_test_table FROM stdin delimiter ',';
1,2,'rr1'
2,3,'rr2'
3,4,'rr3'
4,5,'rr4'
\.
-- Test with select and router insert
BEGIN;
SELECT COUNT(*) FROM test_table;
INSERT INTO test_table VALUES(7,8,'gg');
SELECT COUNT(*) FROM test_table;
ROLLBACK;
-- Test with select and multi-row insert
BEGIN;
SELECT COUNT(*) FROM test_table;
INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii');
SELECT COUNT(*) FROM test_table;
ROLLBACK;
-- Test with INSERT .. SELECT
BEGIN;
SELECT COUNT(*) FROM test_table;
INSERT INTO test_table SELECT * FROM co_test_table;
SELECT COUNT(*) FROM test_table;
ROLLBACK;
-- Test with COPY
BEGIN;
SELECT COUNT(*) FROM test_table;
\COPY test_table FROM stdin delimiter ',';
8,9,'gg'
9,10,'hh'
10,11,'ii'
\.
SELECT COUNT(*) FROM test_table;
ROLLBACK;
-- Test with router update
BEGIN;
SELECT SUM(col_1) FROM test_table;
UPDATE test_table SET col_1 = 0 WHERE id = 2;
DELETE FROM test_table WHERE id = 3;
SELECT SUM(col_1) FROM test_table;
ROLLBACK;
-- Test with multi-shard update
BEGIN;
SELECT SUM(col_1) FROM test_table;
UPDATE test_table SET col_1 = 5;
SELECT SUM(col_1) FROM test_table;
ROLLBACK;
-- Test with subqueries
BEGIN;
SELECT SUM(col_1) FROM test_table;
UPDATE
test_table
SET
col_1 = 4
WHERE
test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1)
AND test_table.id = 1;
SELECT SUM(col_1) FROM test_table;
ROLLBACK;
-- Test with partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
SET citus.shard_replication_factor TO 1;
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
SELECT create_distributed_table('partitioning_test', 'id');
BEGIN;
SELECT COUNT(*) FROM partitioning_test;
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
SELECT COUNT(*) FROM partitioning_test;
COMMIT;
DROP TABLE partitioning_test;
-- Test with create-drop table
BEGIN;
CREATE TABLE test_table_inn(id int, num_1 int);
SELECT create_distributed_table('test_table_inn','id');
INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7);
SELECT COUNT(*) FROM test_table_inn;
DROP TABLE test_table_inn;
COMMIT;
-- Test with utility functions
BEGIN;
SELECT COUNT(*) FROM test_table;
CREATE INDEX tt_ind_1 ON test_table(col_1);
ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
SELECT COUNT(*) FROM test_table;
ROLLBACK;
-- Test with foreign key
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
BEGIN;
DELETE FROM test_table where id = 1 or id = 3;
SELECT * FROM co_test_table;
ROLLBACK;
DROP SCHEMA multi_real_time_transaction CASCADE;