diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index ab0c8f485..af0fd69d2 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -821,10 +821,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); char *nodeUser = CurrentUserName(); MultiConnection *connection = NULL; - uint32 connectionFlags = FOR_DML | CONNECTION_PER_PLACEMENT; + uint32 connectionFlags = FOR_DML; StringInfo copyCommand = NULL; PGresult *result = NULL; + /* + * Make sure we use a separate connection per placement for hash-distributed + * tables in order to allow multi-shard modifications in the same transaction. + */ + if (placement->partitionMethod == DISTRIBUTE_BY_HASH) + { + connectionFlags |= CONNECTION_PER_PLACEMENT; + } + connection = GetPlacementConnection(connectionFlags, placement, nodeUser); if (PQstatus(connection->pgConn) != CONNECTION_OK) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 3554180d6..9c3dfc349 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -54,6 +54,9 @@ typedef struct ConnectionReference uint32 colocationGroupId; uint32 representativeValue; + /* placementId of the placement, used only for append distributed tables */ + uint64 placementId; + /* membership in MultiConnection->referencedPlacements */ dlist_node connectionNode; } ConnectionReference; @@ -357,6 +360,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, placementConnection->hadDML = false; placementConnection->userName = MemoryContextStrdup(TopTransactionContext, userName); + placementConnection->placementId = placementAccess->placement->placementId; /* record association with connection */ dlist_push_tail(&chosenConnection->referencedPlacements, @@ -785,6 +789,14 @@ ConnectionAccessedDifferentPlacement(MultiConnection *connection, ConnectionReference *connectionReference = dlist_container(ConnectionReference, connectionNode, placementIter.cur); + /* handle append and range distributed tables */ + if (placement->partitionMethod != DISTRIBUTE_BY_HASH && + placement->placementId != connectionReference->placementId) + { + return true; + } + + /* handle hash distributed tables */ if (placement->colocationGroupId != INVALID_COLOCATION_ID && placement->colocationGroupId == connectionReference->colocationGroupId && placement->representativeValue != connectionReference->representativeValue) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 91d0e9506..1ff3c3fc9 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -22,6 +22,7 @@ #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" +#include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include @@ -175,6 +176,55 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD } +/* + * MultiClientPlacementConnectStart asynchronously tries to establish a connection + * for a particular set of shard placements. If it succeeds, it returns the + * the connection id. Otherwise, it reports connection error and returns + * INVALID_CONNECTION_ID. + */ +int32 +MultiClientPlacementConnectStart(List *placementAccessList, const char *userName) +{ + MultiConnection *connection = NULL; + ConnStatusType connStatusType = CONNECTION_OK; + int32 connectionId = AllocateConnectionId(); + int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */ + + if (connectionId == INVALID_CONNECTION_ID) + { + ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); + return connectionId; + } + + /* prepare asynchronous request for worker node connection */ + connection = StartPlacementListConnection(connectionFlags, placementAccessList, + userName); + + ClaimConnectionExclusively(connection); + + connStatusType = PQstatus(connection->pgConn); + + /* + * If prepared, we save the connection, and set its initial polling status + * to PGRES_POLLING_WRITING as specified in "Database Connection Control + * Functions" section of the PostgreSQL documentation. + */ + if (connStatusType != CONNECTION_BAD) + { + ClientConnectionArray[connectionId] = connection; + ClientPollingStatusArray[connectionId] = PGRES_POLLING_WRITING; + } + else + { + ReportConnectionError(connection, WARNING); + + connectionId = INVALID_CONNECTION_ID; + } + + return connectionId; +} + + /* MultiClientConnectPoll returns the status of client connection. */ ConnectStatus MultiClientConnectPoll(int32 connectionId) @@ -229,6 +279,14 @@ MultiClientConnectPoll(int32 connectionId) } +/* MultiClientGetConnection returns the connection with the given ID from the pool */ +MultiConnection * +MultiClientGetConnection(int32 connectionId) +{ + return ClientConnectionArray[connectionId]; +} + + /* MultiClientDisconnect disconnects the connection. */ void MultiClientDisconnect(int32 connectionId) @@ -247,6 +305,40 @@ MultiClientDisconnect(int32 connectionId) } +/* + * MultiClientReleaseConnection removes a connection from the client + * executor pool without disconnecting if it is run in the transaction + * otherwise it disconnects. + * + * This allows the connection to be used for other operations in the + * same transaction. The connection will still be closed at COMMIT + * or ABORT time. + */ +void +MultiClientReleaseConnection(int32 connectionId) +{ + MultiConnection *connection = NULL; + const int InvalidPollingStatus = -1; + + Assert(connectionId != INVALID_CONNECTION_ID); + connection = ClientConnectionArray[connectionId]; + Assert(connection != NULL); + + /* allow using same connection only in the same transaction */ + if (!InCoordinatedTransaction()) + { + MultiClientDisconnect(connectionId); + } + else + { + UnclaimConnection(connection); + } + + ClientConnectionArray[connectionId] = NULL; + ClientPollingStatusArray[connectionId] = InvalidPollingStatus; +} + + /* * MultiClientConnectionUp checks if the connection status is up, in other words, * it is not bad. diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 9b7ea9388..63a8dcccb 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -22,6 +22,7 @@ #include #include +#include "access/xact.h" #include "commands/dbcommands.h" #include "distributed/citus_custom_scan.h" #include "distributed/connection_management.h" @@ -29,6 +30,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_resowner.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/resource_lock.h" #include "distributed/worker_protocol.h" @@ -88,6 +90,10 @@ MultiRealTimeExecute(Job *job) workerNodeList = ActiveReadableNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); + if (IsTransactionBlock()) + { + BeginOrContinueCoordinatedTransaction(); + } /* initialize task execution structures for remote execution */ foreach(taskCell, taskList) @@ -259,8 +265,6 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, TaskExecStatus currentStatus = taskStatusArray[currentIndex]; List *taskPlacementList = task->taskPlacementList; ShardPlacement *taskPlacement = list_nth(taskPlacementList, currentIndex); - char *nodeName = taskPlacement->nodeName; - uint32 nodePort = taskPlacement->nodePort; ConnectAction connectAction = CONNECT_ACTION_NONE; /* as most state transitions don't require blocking, default to not waiting */ @@ -271,13 +275,18 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, case EXEC_TASK_CONNECT_START: { int32 connectionId = INVALID_CONNECTION_ID; - char *nodeDatabase = NULL; + List *relationShardList = task->relationShardList; + List *placementAccessList = NIL; - /* we use the same database name on the master and worker nodes */ - nodeDatabase = get_database_name(MyDatabaseId); + /* create placement accesses for placements that appear in a subselect */ + placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, + relationShardList); - connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase, - NULL); + /* should at least have an entry for the anchor shard */ + Assert(list_length(placementAccessList) > 0); + + connectionId = MultiClientPlacementConnectStart(placementAccessList, + NULL); connectionIdArray[currentIndex] = connectionId; /* if valid, poll the connection until the connection is initiated */ @@ -352,6 +361,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, case EXEC_TASK_FAILED: { + bool raiseError = true; + /* * On task failure, we close the connection. We also reset our execution * status assuming that we might fail on all other worker nodes and come @@ -359,6 +370,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, * and compute task(s) on this node again. */ int32 connectionId = connectionIdArray[currentIndex]; + MultiConnection *connection = MultiClientGetConnection(connectionId); + + /* + * If this connection was previously marked as critical (e.g. it was used + * to perform a DDL command), then throw an error. Otherwise, mark it + * as failed and continue executing the query. + */ + MarkRemoteTransactionFailed(connection, raiseError); + MultiClientDisconnect(connectionId); connectionIdArray[currentIndex] = INVALID_CONNECTION_ID; connectAction = CONNECT_ACTION_CLOSED; @@ -582,7 +602,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, taskStatusArray[currentIndex] = EXEC_TASK_DONE; /* we are done executing; we no longer need the connection */ - MultiClientDisconnect(connectionId); + MultiClientReleaseConnection(connectionId); connectionIdArray[currentIndex] = INVALID_CONNECTION_ID; connectAction = CONNECT_ACTION_CLOSED; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 438307e1f..3261fcc4a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -594,6 +594,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) { placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, relationShardList); + + Assert(list_length(placementAccessList) == list_length(relationShardList)); } else { @@ -634,7 +636,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) /* * BuildPlacementSelectList builds a list of SELECT placement accesses * which can be used to call StartPlacementListConnection or - * GetPlacementListConnection. + * GetPlacementListConnection. If the node group does not have a placement + * (e.g. in case of a broadcast join) then the shard is skipped. */ List * BuildPlacementSelectList(uint32 groupId, List *relationShardList) @@ -651,8 +654,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList) placement = FindShardPlacementOnGroup(groupId, relationShard->shardId); if (placement == NULL) { - ereport(ERROR, (errmsg("no active placement of shard %ld found on group %d", - relationShard->shardId, groupId))); + continue; } placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT); @@ -887,6 +889,8 @@ GetModifyConnections(Task *task, bool markCritical) placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, relationShardList); + Assert(list_length(placementAccessList) == list_length(relationShardList)); + /* create placement access for the placement that we're modifying */ placementModification = CreatePlacementAccess(taskPlacement, PLACEMENT_ACCESS_DML); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 22ad67953..c6ea01a4a 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -160,6 +160,7 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment static StringInfo NodeNameArrayString(List *workerNodeList); static StringInfo NodePortArrayString(List *workerNodeList); static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId); +static List * BuildRelationShardList(List *rangeTableList, List *fragmentList); static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList); static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment); @@ -2529,6 +2530,8 @@ SqlTaskList(Job *job) sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK, sqlQueryString->data); sqlTask->dependedTaskList = dataFetchTaskList; + sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList, + fragmentCombination); /* log the query string we generated */ ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId), @@ -3964,6 +3967,40 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin } +/* + * BuildRelationShardList builds a list of RelationShard pairs for a task. + * This represents the mapping of range table entries to shard IDs for a + * task for the purposes of locking, deparsing, and connection management. + */ +static List * +BuildRelationShardList(List *rangeTableList, List *fragmentList) +{ + List *relationShardList = NIL; + ListCell *fragmentCell = NULL; + + foreach(fragmentCell, fragmentList) + { + RangeTableFragment *fragment = (RangeTableFragment *) lfirst(fragmentCell); + Index rangeTableId = fragment->rangeTableId; + RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableId, rangeTableList); + + CitusRTEKind fragmentType = fragment->fragmentType; + if (fragmentType == CITUS_RTE_RELATION) + { + ShardInterval *shardInterval = (ShardInterval *) fragment->fragmentReference; + RelationShard *relationShard = CitusMakeNode(RelationShard); + + relationShard->relationId = rangeTableEntry->relid; + relationShard->shardId = shardInterval->shardId; + + relationShardList = lappend(relationShardList, relationShard); + } + } + + return relationShardList; +} + + /* * UpdateRangeTableAlias walks over each fragment in the given fragment list, * and creates an alias that represents the fragment name to be used in the diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index 0b6561ea3..474e787a5 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -14,6 +14,11 @@ #ifndef MULTI_CLIENT_EXECUTOR_H #define MULTI_CLIENT_EXECUTOR_H + +#include "distributed/connection_management.h" +#include "nodes/pg_list.h" + + #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ #define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */ @@ -99,8 +104,12 @@ extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *nodeUser); extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *nodeUser); +extern int32 MultiClientPlacementConnectStart(List *placementAccessList, + const char *userName); extern ConnectStatus MultiClientConnectPoll(int32 connectionId); +extern MultiConnection * MultiClientGetConnection(int32 connectionId); extern void MultiClientDisconnect(int32 connectionId); +extern void MultiClientReleaseConnection(int32 connectionId); extern bool MultiClientConnectionUp(int32 connectionId); extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult, int *rowCount, int *columnCount); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 864895486..b64360297 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -2132,13 +2132,21 @@ RETURNING *; DEBUG: distributed INSERT ... SELECT can only select from distributed tables ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator RESET client_min_messages; --- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported +-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported TRUNCATE raw_events_first; BEGIN; INSERT INTO raw_events_first (user_id, value_1) SELECT s, s FROM generate_series(1, 5) s; SELECT user_id, value_1 FROM raw_events_first; -ERROR: cannot open new connections after the first modification command within a transaction + user_id | value_1 +---------+--------- + 1 | 1 + 5 | 5 + 3 | 3 + 4 | 4 + 2 | 2 +(5 rows) + ROLLBACK; -- INSERT ... SELECT and single-shard SELECT in the same transaction is supported TRUNCATE raw_events_first; diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out new file mode 100644 index 000000000..7c43d5285 --- /dev/null +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -0,0 +1,233 @@ +SET citus.next_shard_id TO 1610000; +CREATE SCHEMA multi_real_time_transaction; +SET search_path = 'multi_real_time_transaction'; +SET citus.shard_replication_factor to 1; +CREATE TABLE test_table(id int, col_1 int, col_2 text); +SELECT create_distributed_table('test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +CREATE TABLE co_test_table(id int, col_1 int, col_2 text); +SELECT create_distributed_table('co_test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY co_test_table FROM stdin delimiter ','; +CREATE TABLE ref_test_table(id int, col_1 int, col_2 text); +SELECT create_reference_table('ref_test_table'); + create_reference_table +------------------------ + +(1 row) + +\COPY ref_test_table FROM stdin delimiter ','; +-- Test with select and router insert +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +INSERT INTO test_table VALUES(7,8,'gg'); +SELECT COUNT(*) FROM test_table; + count +------- + 7 +(1 row) + +ROLLBACK; +-- Test with select and multi-row insert +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii'); +SELECT COUNT(*) FROM test_table; + count +------- + 9 +(1 row) + +ROLLBACK; +-- Test with INSERT .. SELECT +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +INSERT INTO test_table SELECT * FROM co_test_table; +SELECT COUNT(*) FROM test_table; + count +------- + 12 +(1 row) + +ROLLBACK; +-- Test with COPY +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +\COPY test_table FROM stdin delimiter ','; +SELECT COUNT(*) FROM test_table; + count +------- + 9 +(1 row) + +ROLLBACK; +-- Test with router update +BEGIN; +SELECT SUM(col_1) FROM test_table; + sum +----- + 27 +(1 row) + +UPDATE test_table SET col_1 = 0 WHERE id = 2; +DELETE FROM test_table WHERE id = 3; +SELECT SUM(col_1) FROM test_table; + sum +----- + 20 +(1 row) + +ROLLBACK; +-- Test with multi-shard update +BEGIN; +SELECT SUM(col_1) FROM test_table; + sum +----- + 27 +(1 row) + +UPDATE test_table SET col_1 = 5; +SELECT SUM(col_1) FROM test_table; + sum +----- + 30 +(1 row) + +ROLLBACK; +-- Test with subqueries +BEGIN; +SELECT SUM(col_1) FROM test_table; + sum +----- + 27 +(1 row) + +UPDATE + test_table +SET + col_1 = 4 +WHERE + test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1) + AND test_table.id = 1; +SELECT SUM(col_1) FROM test_table; + sum +----- + 29 +(1 row) + +ROLLBACK; +-- Test with partitioned table +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +SET citus.shard_replication_factor TO 1; +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +SELECT create_distributed_table('partitioning_test', 'id'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +SELECT COUNT(*) FROM partitioning_test; + count +------- + 2 +(1 row) + +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +SELECT COUNT(*) FROM partitioning_test; + count +------- + 4 +(1 row) + +COMMIT; +DROP TABLE partitioning_test; +-- Test with create-drop table +BEGIN; +CREATE TABLE test_table_inn(id int, num_1 int); +SELECT create_distributed_table('test_table_inn','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7); +SELECT COUNT(*) FROM test_table_inn; + count +------- + 3 +(1 row) + +DROP TABLE test_table_inn; +COMMIT; +-- Test with utility functions +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +CREATE INDEX tt_ind_1 ON test_table(col_1); +ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50); +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +ROLLBACK; +-- Test with foreign key +ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); +ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE; +BEGIN; +DELETE FROM test_table where id = 1 or id = 3; +SELECT * FROM co_test_table; + id | col_1 | col_2 +----+-------+-------- + 2 | 30 | 'bb10' +(1 row) + +ROLLBACK; +DROP SCHEMA multi_real_time_transaction CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table test_table +drop cascades to table co_test_table +drop cascades to table ref_test_table diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out new file mode 100644 index 000000000..c335b0a73 --- /dev/null +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -0,0 +1,241 @@ +SET citus.next_shard_id TO 1610000; +CREATE SCHEMA multi_real_time_transaction; +SET search_path = 'multi_real_time_transaction'; +SET citus.shard_replication_factor to 1; +CREATE TABLE test_table(id int, col_1 int, col_2 text); +SELECT create_distributed_table('test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +CREATE TABLE co_test_table(id int, col_1 int, col_2 text); +SELECT create_distributed_table('co_test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY co_test_table FROM stdin delimiter ','; +CREATE TABLE ref_test_table(id int, col_1 int, col_2 text); +SELECT create_reference_table('ref_test_table'); + create_reference_table +------------------------ + +(1 row) + +\COPY ref_test_table FROM stdin delimiter ','; +-- Test with select and router insert +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +INSERT INTO test_table VALUES(7,8,'gg'); +SELECT COUNT(*) FROM test_table; + count +------- + 7 +(1 row) + +ROLLBACK; +-- Test with select and multi-row insert +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii'); +SELECT COUNT(*) FROM test_table; + count +------- + 9 +(1 row) + +ROLLBACK; +-- Test with INSERT .. SELECT +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +INSERT INTO test_table SELECT * FROM co_test_table; +SELECT COUNT(*) FROM test_table; + count +------- + 12 +(1 row) + +ROLLBACK; +-- Test with COPY +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +\COPY test_table FROM stdin delimiter ','; +SELECT COUNT(*) FROM test_table; + count +------- + 9 +(1 row) + +ROLLBACK; +-- Test with router update +BEGIN; +SELECT SUM(col_1) FROM test_table; + sum +----- + 27 +(1 row) + +UPDATE test_table SET col_1 = 0 WHERE id = 2; +DELETE FROM test_table WHERE id = 3; +SELECT SUM(col_1) FROM test_table; + sum +----- + 20 +(1 row) + +ROLLBACK; +-- Test with multi-shard update +BEGIN; +SELECT SUM(col_1) FROM test_table; + sum +----- + 27 +(1 row) + +UPDATE test_table SET col_1 = 5; +SELECT SUM(col_1) FROM test_table; + sum +----- + 30 +(1 row) + +ROLLBACK; +-- Test with subqueries +BEGIN; +SELECT SUM(col_1) FROM test_table; + sum +----- + 27 +(1 row) + +UPDATE + test_table +SET + col_1 = 4 +WHERE + test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1) + AND test_table.id = 1; +SELECT SUM(col_1) FROM test_table; + sum +----- + 29 +(1 row) + +ROLLBACK; +-- Test with partitioned table +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ... + ^ +SET citus.shard_replication_factor TO 1; +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin... + ^ +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin... + ^ +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06'); + ^ +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07'); + ^ +SELECT create_distributed_table('partitioning_test', 'id'); +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test', 'id'); + ^ +BEGIN; +SELECT COUNT(*) FROM partitioning_test; +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT COUNT(*) FROM partitioning_test; + ^ +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +ERROR: current transaction is aborted, commands ignored until end of transaction block +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT COUNT(*) FROM partitioning_test; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +DROP TABLE partitioning_test; +ERROR: table "partitioning_test" does not exist +-- Test with create-drop table +BEGIN; +CREATE TABLE test_table_inn(id int, num_1 int); +SELECT create_distributed_table('test_table_inn','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7); +SELECT COUNT(*) FROM test_table_inn; + count +------- + 3 +(1 row) + +DROP TABLE test_table_inn; +COMMIT; +-- Test with utility functions +BEGIN; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +CREATE INDEX tt_ind_1 ON test_table(col_1); +ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50); +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +ROLLBACK; +-- Test with foreign key +ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); +ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE; +BEGIN; +DELETE FROM test_table where id = 1 or id = 3; +SELECT * FROM co_test_table; + id | col_1 | col_2 +----+-------+-------- + 2 | 30 | 'bb10' +(1 row) + +ROLLBACK; +DROP SCHEMA multi_real_time_transaction CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table test_table +drop cascades to table co_test_table +drop cascades to table ref_test_table diff --git a/src/test/regress/input/multi_alter_table_statements.source b/src/test/regress/input/multi_alter_table_statements.source index 78bdccde6..099c5db88 100644 --- a/src/test/regress/input/multi_alter_table_statements.source +++ b/src/test/regress/input/multi_alter_table_statements.source @@ -315,13 +315,13 @@ DROP FUNCTION log_ddl_tag(); DROP TABLE ddl_commands; \c - - - :master_port --- Distributed SELECTs cannot appear after ALTER +-- Distributed SELECTs may appear after ALTER BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); SELECT count(*) FROM lineitem_alter; -COMMIT; +ROLLBACK; --- but are allowed before +-- and before BEGIN; SELECT count(*) FROM lineitem_alter; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 03201aca4..09f18eeda 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -32,6 +32,7 @@ test: multi_read_from_secondaries test: multi_create_table test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_insert_select_window multi_shard_update_delete +test: multi_real_time_transaction # ---------- # Tests for partitioning support diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 332cc35ad..1dd7973b9 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -673,13 +673,17 @@ DROP EVENT TRIGGER log_ddl_tag; DROP FUNCTION log_ddl_tag(); DROP TABLE ddl_commands; \c - - - :master_port --- Distributed SELECTs cannot appear after ALTER +-- Distributed SELECTs may appear after ALTER BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); SELECT count(*) FROM lineitem_alter; -ERROR: cannot open new connections after the first modification command within a transaction -COMMIT; --- but are allowed before + count +------- + 18000 +(1 row) + +ROLLBACK; +-- and before BEGIN; SELECT count(*) FROM lineitem_alter; count diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 51a0632ab..618f3f971 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1716,7 +1716,7 @@ RETURNING *; RESET client_min_messages; --- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported +-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported TRUNCATE raw_events_first; BEGIN; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql new file mode 100644 index 000000000..229d11d85 --- /dev/null +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -0,0 +1,147 @@ +SET citus.next_shard_id TO 1610000; + +CREATE SCHEMA multi_real_time_transaction; +SET search_path = 'multi_real_time_transaction'; +SET citus.shard_replication_factor to 1; + +CREATE TABLE test_table(id int, col_1 int, col_2 text); +SELECT create_distributed_table('test_table','id'); +\COPY test_table FROM stdin delimiter ','; +1,2,'aa' +2,3,'bb' +3,4,'cc' +4,5,'dd' +5,6,'ee' +6,7,'ff' +\. + +CREATE TABLE co_test_table(id int, col_1 int, col_2 text); +SELECT create_distributed_table('co_test_table','id'); +\COPY co_test_table FROM stdin delimiter ','; +1,20,'aa10' +2,30,'bb10' +3,40,'cc10' +3,4,'cc1' +3,5,'cc2' +1,2,'cc2' +\. + + +CREATE TABLE ref_test_table(id int, col_1 int, col_2 text); +SELECT create_reference_table('ref_test_table'); +\COPY ref_test_table FROM stdin delimiter ','; +1,2,'rr1' +2,3,'rr2' +3,4,'rr3' +4,5,'rr4' +\. + +-- Test with select and router insert +BEGIN; +SELECT COUNT(*) FROM test_table; +INSERT INTO test_table VALUES(7,8,'gg'); +SELECT COUNT(*) FROM test_table; +ROLLBACK; + +-- Test with select and multi-row insert +BEGIN; +SELECT COUNT(*) FROM test_table; +INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii'); +SELECT COUNT(*) FROM test_table; +ROLLBACK; + +-- Test with INSERT .. SELECT +BEGIN; +SELECT COUNT(*) FROM test_table; +INSERT INTO test_table SELECT * FROM co_test_table; +SELECT COUNT(*) FROM test_table; +ROLLBACK; + +-- Test with COPY +BEGIN; +SELECT COUNT(*) FROM test_table; +\COPY test_table FROM stdin delimiter ','; +8,9,'gg' +9,10,'hh' +10,11,'ii' +\. +SELECT COUNT(*) FROM test_table; +ROLLBACK; + +-- Test with router update +BEGIN; +SELECT SUM(col_1) FROM test_table; +UPDATE test_table SET col_1 = 0 WHERE id = 2; +DELETE FROM test_table WHERE id = 3; +SELECT SUM(col_1) FROM test_table; +ROLLBACK; + +-- Test with multi-shard update +BEGIN; +SELECT SUM(col_1) FROM test_table; +UPDATE test_table SET col_1 = 5; +SELECT SUM(col_1) FROM test_table; +ROLLBACK; + +-- Test with subqueries +BEGIN; +SELECT SUM(col_1) FROM test_table; +UPDATE + test_table +SET + col_1 = 4 +WHERE + test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1) + AND test_table.id = 1; +SELECT SUM(col_1) FROM test_table; +ROLLBACK; + +-- Test with partitioned table +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +SET citus.shard_replication_factor TO 1; + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); + +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +SELECT create_distributed_table('partitioning_test', 'id'); + +BEGIN; +SELECT COUNT(*) FROM partitioning_test; +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +SELECT COUNT(*) FROM partitioning_test; +COMMIT; + +DROP TABLE partitioning_test; + +-- Test with create-drop table +BEGIN; +CREATE TABLE test_table_inn(id int, num_1 int); +SELECT create_distributed_table('test_table_inn','id'); +INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7); +SELECT COUNT(*) FROM test_table_inn; +DROP TABLE test_table_inn; +COMMIT; + +-- Test with utility functions +BEGIN; +SELECT COUNT(*) FROM test_table; +CREATE INDEX tt_ind_1 ON test_table(col_1); +ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50); +SELECT COUNT(*) FROM test_table; +ROLLBACK; + +-- Test with foreign key +ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); +ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE; + +BEGIN; +DELETE FROM test_table where id = 1 or id = 3; +SELECT * FROM co_test_table; +ROLLBACK; + +DROP SCHEMA multi_real_time_transaction CASCADE;