From 132d9212d074409fb8f7b1d6ab493d9b962ca6e4 Mon Sep 17 00:00:00 2001 From: eren Date: Fri, 22 Apr 2016 18:01:19 +0300 Subject: [PATCH] ADD master_modify_multiple_shards UDF Fixes #10 This change creates a new UDF: master_modify_multiple_shards Parameters: modify_query: A simple DELETE or UPDATE query as a string. The UDF is similar to the existing master_apply_delete_command UDF. Basically, given the modify query, it prunes the shard list, re-constructs the query for each shard and sends the query to the placements. Depending on the value of citus.multi_shard_commit_protocol, the commit can be done in one-phase or two-phase manner. Limitations: * It cannot be called inside a transaction block * It only be called with simple operator expressions (like Single Shard Modify) Sample Usage: ``` SELECT master_modify_multiple_shards( 'DELETE FROM customer_delete_protocol WHERE c_custkey > 500 AND c_custkey < 500'); ``` --- src/backend/distributed/Makefile | 6 +- .../distributed/citus--5.1-1--5.1-2.sql | 6 + src/backend/distributed/citus.control | 2 +- src/backend/distributed/commands/multi_copy.c | 150 +------ .../master/master_delete_protocol.c | 2 +- .../master/master_modify_multiple_shards.c | 409 ++++++++++++++++++ .../planner/multi_router_planner.c | 3 +- .../distributed/utils/connection_cache.c | 1 + .../distributed/utils/multi_transaction.c | 78 ++++ src/backend/distributed/utils/resource_lock.c | 30 ++ .../distributed/utils/shardinterval_utils.c | 28 ++ src/include/distributed/connection_cache.h | 4 +- src/include/distributed/master_protocol.h | 2 +- .../distributed/multi_router_planner.h | 1 + src/include/distributed/multi_transaction.h | 13 + src/include/distributed/resource_lock.h | 2 + src/include/distributed/shardinterval_utils.h | 2 +- src/test/regress/expected/multi_extension.out | 1 + .../regress/expected/multi_shard_modify.out | 261 +++++++++++ .../input/multi_master_delete_protocol.source | 1 + src/test/regress/multi_schedule | 4 +- .../multi_master_delete_protocol.source | 1 + src/test/regress/pg_regress_multi.pl | 1 + src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_shard_modify.sql | 153 +++++++ 25 files changed, 1002 insertions(+), 160 deletions(-) create mode 100644 src/backend/distributed/citus--5.1-1--5.1-2.sql create mode 100644 src/backend/distributed/master/master_modify_multiple_shards.c create mode 100644 src/test/regress/expected/multi_shard_modify.out create mode 100644 src/test/regress/sql/multi_shard_modify.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 6a26d1cb0..9679e662e 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -6,7 +6,7 @@ citus_top_builddir = ../../.. MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ - 5.1-1 + 5.1-1 5.1-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -35,7 +35,9 @@ $(EXTENSION)--5.0-2.sql: $(EXTENSION)--5.0-1.sql $(EXTENSION)--5.0-1--5.0-2.sql cat $^ > $@ $(EXTENSION)--5.1-1.sql: $(EXTENSION)--5.0-2.sql $(EXTENSION)--5.0-2--5.1-1.sql cat $^ > $@ - +$(EXTENSION)--5.1-2.sql: $(EXTENSION)--5.1-1.sql $(EXTENSION)--5.1-1--5.1-2.sql + cat $^ > $@ + NO_PGXS = 1 SHLIB_LINK = $(libpq) diff --git a/src/backend/distributed/citus--5.1-1--5.1-2.sql b/src/backend/distributed/citus--5.1-1--5.1-2.sql new file mode 100644 index 000000000..7a4f56eee --- /dev/null +++ b/src/backend/distributed/citus--5.1-1--5.1-2.sql @@ -0,0 +1,6 @@ +CREATE FUNCTION pg_catalog.master_modify_multiple_shards(text) + RETURNS integer + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_modify_multiple_shards$$; +COMMENT ON FUNCTION master_modify_multiple_shards(text) + IS 'push delete and update queries to shards'; \ No newline at end of file diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index b89b19b43..76ff52de4 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.1-1' +default_version = '5.1-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index ed4242305..f1f1715ba 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -125,9 +125,6 @@ #include "utils/memutils.h" -#define INITIAL_CONNECTION_CACHE_SIZE 1001 - - /* constant used in binary protocol */ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -135,26 +132,12 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; static PGconn *masterConnection = NULL; -/* ShardConnections represents a set of connections for each placement of a shard */ -typedef struct ShardConnections -{ - int64 shardId; - List *connectionList; -} ShardConnections; - - /* Local functions forward declarations */ static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag); static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); static char MasterPartitionMethod(RangeVar *relation); static void RemoveMasterOptions(CopyStmt *copyStatement); -static void LockAllShards(List *shardIntervalList); -static HTAB * CreateShardConnectionHash(void); -static int CompareShardIntervalsById(const void *leftElement, const void *rightElement); -static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, - int64 shardId, - bool *shardConnectionsFound); static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure); static List * MasterShardPlacementList(uint64 shardId); @@ -165,7 +148,6 @@ static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId); -static List * ConnectionList(HTAB *connectionHash); static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); @@ -432,7 +414,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } /* prevent concurrent placement changes and non-commutative DML statements */ - LockAllShards(shardIntervalList); + LockShards(shardIntervalList, ShareLock); /* initialize the shard interval cache */ shardCount = cacheEntry->shardIntervalArrayLength; @@ -883,111 +865,6 @@ RemoveMasterOptions(CopyStmt *copyStatement) } -/* - * LockAllShards takes shared locks on the metadata and the data of all shards in - * shardIntervalList. This prevents concurrent placement changes and concurrent - * DML statements that require an exclusive lock. - */ -static void -LockAllShards(List *shardIntervalList) -{ - ListCell *shardIntervalCell = NULL; - - /* lock shards in order of shard id to prevent deadlock */ - shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - int64 shardId = shardInterval->shardId; - - /* prevent concurrent changes to number of placements */ - LockShardDistributionMetadata(shardId, ShareLock); - - /* prevent concurrent update/delete statements */ - LockShardResource(shardId, ShareLock); - } -} - - -/* - * CreateShardConnectionHash constructs a hash table used for shardId->Connection - * mapping. - */ -static HTAB * -CreateShardConnectionHash(void) -{ - HTAB *shardConnectionsHash = NULL; - int hashFlags = 0; - HASHCTL info; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(int64); - info.entrysize = sizeof(ShardConnections); - info.hash = tag_hash; - - hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; - shardConnectionsHash = hash_create("Shard Connections Hash", - INITIAL_CONNECTION_CACHE_SIZE, &info, - hashFlags); - - return shardConnectionsHash; -} - - -/* - * CompareShardIntervalsById is a comparison function for sort shard - * intervals by their shard ID. - */ -static int -CompareShardIntervalsById(const void *leftElement, const void *rightElement) -{ - ShardInterval *leftInterval = *((ShardInterval **) leftElement); - ShardInterval *rightInterval = *((ShardInterval **) rightElement); - int64 leftShardId = leftInterval->shardId; - int64 rightShardId = rightInterval->shardId; - - /* we compare 64-bit integers, instead of casting their difference to int */ - if (leftShardId > rightShardId) - { - return 1; - } - else if (leftShardId < rightShardId) - { - return -1; - } - else - { - return 0; - } -} - - -/* - * GetShardConnections finds existing connections for a shard in the hash - * or opens new connections to each active placement and starts a (binary) COPY - * transaction on each of them. - */ -static ShardConnections * -GetShardConnections(HTAB *shardConnectionHash, int64 shardId, - bool *shardConnectionsFound) -{ - ShardConnections *shardConnections = NULL; - - shardConnections = (ShardConnections *) hash_search(shardConnectionHash, - &shardId, - HASH_ENTER, - shardConnectionsFound); - if (!*shardConnectionsFound) - { - shardConnections->shardId = shardId; - shardConnections->connectionList = NIL; - } - - return shardConnections; -} - - /* * OpenCopyTransactions opens a connection for each placement of a shard and * starts a COPY transaction. If a connection cannot be opened, then the shard @@ -1251,31 +1128,6 @@ SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId } -/* - * ConnectionList flattens the connection hash to a list of placement connections. - */ -static List * -ConnectionList(HTAB *connectionHash) -{ - List *connectionList = NIL; - HASH_SEQ_STATUS status; - ShardConnections *shardConnections = NULL; - - hash_seq_init(&status, connectionHash); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - while (shardConnections != NULL) - { - List *shardConnectionsList = list_copy(shardConnections->connectionList); - connectionList = list_concat(connectionList, shardConnectionsList); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - } - - return connectionList; -} - - /* * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure * is true, then EndRemoteCopy reports an error on failure, otherwise it diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index fa3e7f20b..09c50d5b5 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -163,7 +163,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) /* - * master_drop_shards attempts to drop all shards for a given relation. + * master_drop_all_shards attempts to drop all shards for a given relation. * Unlike master_apply_delete_command, this function can be called even * if the table has already been dropped. */ diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c new file mode 100644 index 000000000..e4144e6b1 --- /dev/null +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -0,0 +1,409 @@ +/*------------------------------------------------------------------------- + * + * master_modify_multiple_shards.c + * UDF to run multi shard update/delete queries + * + * This file contains master_modify_multiple_shards function, which takes a update + * or delete query and runs it worker shards of the distributed table. The distributed + * modify operation can be done within a distributed transaction and committed in + * one-phase or two-phase fashion, depending on the citus.multi_shard_commit_protocol + * setting. + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "access/xact.h" +#include "catalog/namespace.h" +#include "catalog/pg_class.h" +#include "commands/dbcommands.h" +#include "commands/event_trigger.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/connection_cache.h" +#include "distributed/listutils.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_client_executor.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_executor.h" +#include "distributed/multi_router_planner.h" +#include "distributed/multi_server_executor.h" +#include "distributed/multi_transaction.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/pg_dist_partition.h" +#include "distributed/resource_lock.h" +#include "distributed/worker_protocol.h" +#include "optimizer/clauses.h" +#include "optimizer/predtest.h" +#include "optimizer/restrictinfo.h" +#include "optimizer/var.h" +#include "nodes/makefuncs.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" + + +static void LockShardsForModify(List *shardIntervalList); +static bool HasReplication(List *shardIntervalList); +static int SendQueryToShards(Query *query, List *shardIntervalList); +static HTAB * OpenConnectionsToAllShardPlacements(List *shardIntervalList); +static void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash); +static int SendQueryToPlacements(char *shardQueryString, + ShardConnections *shardConnections); + +PG_FUNCTION_INFO_V1(master_modify_multiple_shards); + + +/* + * master_modify_multiple_shards takes in a DELETE or UPDATE query string and + * pushes the query to shards. It finds shards that match the criteria defined + * in the delete command, generates the same delete query string for each of the + * found shards with distributed table name replaced with the shard name and + * sends the queries to the workers. It uses one-phase or two-phase commit + * transactions depending on citus.copy_transaction_manager value. + */ +Datum +master_modify_multiple_shards(PG_FUNCTION_ARGS) +{ + text *queryText = PG_GETARG_TEXT_P(0); + char *queryString = text_to_cstring(queryText); + List *queryTreeList = NIL; + Oid relationId = InvalidOid; + Index tableId = 1; + Query *modifyQuery = NULL; + Node *queryTreeNode; + List *restrictClauseList = NIL; + bool isTopLevel = true; + bool failOK = false; + List *shardIntervalList = NIL; + List *prunedShardIntervalList = NIL; + int32 affectedTupleCount = 0; + + PreventTransactionChain(isTopLevel, "master_modify_multiple_shards"); + + queryTreeNode = ParseTreeNode(queryString); + if (IsA(queryTreeNode, DeleteStmt)) + { + DeleteStmt *deleteStatement = (DeleteStmt *) queryTreeNode; + relationId = RangeVarGetRelid(deleteStatement->relation, NoLock, failOK); + EnsureTablePermissions(relationId, ACL_DELETE); + } + else if (IsA(queryTreeNode, UpdateStmt)) + { + UpdateStmt *updateStatement = (UpdateStmt *) queryTreeNode; + relationId = RangeVarGetRelid(updateStatement->relation, NoLock, failOK); + EnsureTablePermissions(relationId, ACL_UPDATE); + } + else + { + ereport(ERROR, (errmsg("query \"%s\" is not a delete nor update statement", + queryString))); + } + + CheckDistributedTable(relationId); + + queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0); + modifyQuery = (Query *) linitial(queryTreeList); + + ErrorIfModifyQueryNotSupported(modifyQuery); + + shardIntervalList = LoadShardIntervalList(relationId); + restrictClauseList = WhereClauseList(modifyQuery->jointree); + + prunedShardIntervalList = + PruneShardList(relationId, tableId, restrictClauseList, shardIntervalList); + + CHECK_FOR_INTERRUPTS(); + + LockShardsForModify(prunedShardIntervalList); + + affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList); + + PG_RETURN_INT32(affectedTupleCount); +} + + +/* + * LockShardsForModify command locks the replicas of given shard. The + * lock logic is slightly different from LockShards function. Basically, + * + * 1. If citus.all_modifications_commutative is set to true, then all locks + * are acquired as ShareLock. + * 2. If citus.all_modifications_commutative is false, then only the shards + * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the + * lock is acquired with ShareLock. + */ +static void +LockShardsForModify(List *shardIntervalList) +{ + LOCKMODE lockMode = NoLock; + + if (AllModificationsCommutative) + { + lockMode = ShareLock; + } + else if (!HasReplication(shardIntervalList)) /* check if any shards have >1 replica */ + { + lockMode = ShareLock; + } + else + { + lockMode = ExclusiveLock; + } + + LockShards(shardIntervalList, lockMode); +} + + +/* + * HasReplication checks whether any of the shards in the given list has more + * than one replica. + */ +static bool +HasReplication(List *shardIntervalList) +{ + ListCell *shardIntervalCell; + bool hasReplication = false; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + List *shardPlacementList = FinalizedShardPlacementList(shardId); + if (shardPlacementList->length > 1) + { + hasReplication = true; + } + } + + return hasReplication; +} + + +/* + * SendQueryToShards executes the given query in all placements of the given + * shard list and returns the total affected tuple count. The execution is done + * in a distributed transaction and the commit protocol is decided according to + * the value of citus.multi_shard_commit_protocol parameter. SendQueryToShards + * does not acquire locks for the shards so it is advised to acquire locks to + * the shards when necessary before calling SendQueryToShards. + */ +static int +SendQueryToShards(Query *query, List *shardIntervalList) +{ + int affectedTupleCount = 0; + HTAB *shardConnectionHash = OpenConnectionsToAllShardPlacements(shardIntervalList); + List *allShardsConnectionList = ConnectionList(shardConnectionHash); + + PG_TRY(); + { + ListCell *shardIntervalCell = NULL; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst( + shardIntervalCell); + Oid relationId = shardInterval->relationId; + uint64 shardId = shardInterval->shardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + StringInfo shardQueryString = makeStringInfo(); + char *shardQueryStringData = NULL; + int shardAffectedTupleCount = -1; + + shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + Assert(shardConnectionsFound); + + deparse_shard_query(query, relationId, shardId, shardQueryString); + shardQueryStringData = shardQueryString->data; + shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, + shardConnections); + affectedTupleCount += shardAffectedTupleCount; + } + + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + PrepareRemoteTransactions(allShardsConnectionList); + } + + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); + } + PG_CATCH(); + { + /* roll back all transactions */ + AbortRemoteTransactions(allShardsConnectionList); + CloseConnections(allShardsConnectionList); + + PG_RE_THROW(); + } + PG_END_TRY(); + + CommitRemoteTransactions(allShardsConnectionList, false); + CloseConnections(allShardsConnectionList); + + return affectedTupleCount; +} + + +/* + * OpenConnectionsToAllShardPlacement opens connections to all placements of + * the given shard list and returns the hash table containing the connections. + * The resulting hash table maps shardId to ShardConnection struct. + */ +static HTAB * +OpenConnectionsToAllShardPlacements(List *shardIntervalList) +{ + HTAB *shardConnectionHash = CreateShardConnectionHash(); + + ListCell *shardIntervalCell = NULL; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + + OpenConnectionsToShardPlacements(shardId, shardConnectionHash); + } + + return shardConnectionHash; +} + + +/* + * OpenConnectionsToShardPlacements opens connections to all placements of the + * shard with the given shardId and populates the shardConnectionHash table + * accordingly. + */ +static void +OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash) +{ + bool shardConnectionsFound = false; + + /* get existing connections to the shard placements, if any */ + ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + + List *shardPlacementList = FinalizedShardPlacementList(shardId); + ListCell *shardPlacementCell = NULL; + List *connectionList = NIL; + + Assert(!shardConnectionsFound); + + if (shardPlacementList == NIL) + { + ereport(ERROR, (errmsg("could not find any shard placements for the shard " + UINT64_FORMAT, shardId))); + } + + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( + shardPlacementCell); + char *workerName = shardPlacement->nodeName; + uint32 workerPort = shardPlacement->nodePort; + char *nodeUser = CurrentUserName(); + PGconn *connection = ConnectToNode(workerName, workerPort, nodeUser); + TransactionConnection *transactionConnection = NULL; + + if (connection == NULL) + { + List *abortConnectionList = ConnectionList(shardConnectionHash); + CloseConnections(abortConnectionList); + + ereport(ERROR, (errmsg("could not establish a connection to all " + "placements"))); + } + + transactionConnection = palloc0(sizeof(TransactionConnection)); + + transactionConnection->connectionId = shardConnections->shardId; + transactionConnection->transactionState = TRANSACTION_STATE_INVALID; + transactionConnection->connection = connection; + + connectionList = lappend(connectionList, transactionConnection); + } + + shardConnections->connectionList = connectionList; +} + + +/* + * SendQueryToPlacements sends the given query string to all given placement + * connections of a shard. The query is sent with a BEGIN before the the actual + * query so, CommitRemoteTransactions or AbortRemoteTransactions should be + * called after all queries have been sent successfully. + */ +static int +SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections) +{ + uint64 shardId = shardConnections->shardId; + List *connectionList = shardConnections->connectionList; + ListCell *connectionCell = NULL; + int32 shardAffectedTupleCount = -1; + + Assert(connectionList != NIL); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + PGresult *result = NULL; + char *placementAffectedTupleString = NULL; + int32 placementAffectedTupleCount = -1; + + CHECK_FOR_INTERRUPTS(); + + /* send the query */ + result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReportRemoteError(connection, result); + ereport(ERROR, (errmsg("could not send query to shard placement"))); + } + + result = PQexec(connection, shardQueryString); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReportRemoteError(connection, result); + ereport(ERROR, (errmsg("could not send query to shard placement"))); + } + + placementAffectedTupleString = PQcmdTuples(result); + placementAffectedTupleCount = pg_atoi(placementAffectedTupleString, + sizeof(int32), 0); + + if ((shardAffectedTupleCount == -1) || + (shardAffectedTupleCount == placementAffectedTupleCount)) + { + shardAffectedTupleCount = placementAffectedTupleCount; + } + else + { + ereport(ERROR, + (errmsg("modified %d tuples, but expected to modify %d", + placementAffectedTupleCount, shardAffectedTupleCount), + errdetail("Affected tuple counts at placements of shard " + UINT64_FORMAT " are different.", shardId))); + } + + PQclear(result); + + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + } + + return shardAffectedTupleCount; +} diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a7d2c2b97..f94e30259 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -56,7 +56,6 @@ /* planner functions forward declarations */ -static void ErrorIfModifyQueryNotSupported(Query *queryTree); static Task * RouterModifyTask(Query *query); #if (PG_VERSION_NUM >= 90500) static OnConflictExpr * RebuildOnConflict(Oid relationId, @@ -124,7 +123,7 @@ MultiRouterPlanCreate(Query *query) * ErrorIfModifyQueryNotSupported checks if the query contains unsupported features, * and errors out if it does. */ -static void +void ErrorIfModifyQueryNotSupported(Query *queryTree) { Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index a93b21ec8..3f05a0ce4 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -23,6 +23,7 @@ #include "distributed/metadata_cache.h" #include "lib/stringinfo.h" #include "mb/pg_wchar.h" +#include "nodes/pg_list.h" #include "utils/builtins.h" #include "utils/elog.h" #include "utils/errcodes.h" diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index a7863025e..f9885f4e0 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -20,6 +20,9 @@ #include "nodes/pg_list.h" +#define INITIAL_CONNECTION_CACHE_SIZE 1001 + + /* Local functions forward declarations */ static uint32 DistributedTransactionId = 0; @@ -269,3 +272,78 @@ CloseConnections(List *connectionList) PQfinish(connection); } } + + +/* + * CreateShardConnectionHash constructs a hash table used for shardId->Connection + * mapping. + */ +HTAB * +CreateShardConnectionHash(void) +{ + HTAB *shardConnectionsHash = NULL; + int hashFlags = 0; + HASHCTL info; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardConnections); + info.hash = tag_hash; + + hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; + shardConnectionsHash = hash_create("Shard Connections Hash", + INITIAL_CONNECTION_CACHE_SIZE, &info, + hashFlags); + + return shardConnectionsHash; +} + + +/* + * GetShardConnections finds existing connections for a shard in the hash. + * If not found, then a ShardConnections structure with empty connectionList + * is returned. + */ +ShardConnections * +GetShardConnections(HTAB *shardConnectionHash, int64 shardId, + bool *shardConnectionsFound) +{ + ShardConnections *shardConnections = NULL; + + shardConnections = (ShardConnections *) hash_search(shardConnectionHash, + &shardId, + HASH_ENTER, + shardConnectionsFound); + if (!*shardConnectionsFound) + { + shardConnections->shardId = shardId; + shardConnections->connectionList = NIL; + } + + return shardConnections; +} + + +/* + * ConnectionList flattens the connection hash to a list of placement connections. + */ +List * +ConnectionList(HTAB *connectionHash) +{ + List *connectionList = NIL; + HASH_SEQ_STATUS status; + ShardConnections *shardConnections = NULL; + + hash_seq_init(&status, connectionHash); + + shardConnections = (ShardConnections *) hash_seq_search(&status); + while (shardConnections != NULL) + { + List *shardConnectionsList = list_copy(shardConnections->connectionList); + connectionList = list_concat(connectionList, shardConnectionsList); + + shardConnections = (ShardConnections *) hash_seq_search(&status); + } + + return connectionList; +} diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 3250e4943..ba748e890 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -17,8 +17,11 @@ #include "c.h" #include "miscadmin.h" +#include "distributed/listutils.h" +#include "distributed/master_metadata_utility.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "storage/lmgr.h" @@ -119,3 +122,30 @@ UnlockJobResource(uint64 jobId, LOCKMODE lockmode) LockRelease(&tag, lockmode, sessionLock); } + + +/* + * LockShards takes shared locks on the metadata and the data of all shards in + * shardIntervalList. This prevents concurrent placement changes and concurrent + * DML statements that require an exclusive lock. + */ +void +LockShards(List *shardIntervalList, LOCKMODE lockMode) +{ + ListCell *shardIntervalCell = NULL; + + /* lock shards in order of shard id to prevent deadlock */ + shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; + + /* prevent concurrent changes to number of placements */ + LockShardDistributionMetadata(shardId, lockMode); + + /* prevent concurrent update/delete statements */ + LockShardResource(shardId, lockMode); + } +} diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 68acb56e0..fe0d4899f 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -78,6 +78,34 @@ CompareShardIntervals(const void *leftElement, const void *rightElement, } +/* + * CompareShardIntervalsById is a comparison function for sort shard + * intervals by their shard ID. + */ +int +CompareShardIntervalsById(const void *leftElement, const void *rightElement) +{ + ShardInterval *leftInterval = *((ShardInterval **) leftElement); + ShardInterval *rightInterval = *((ShardInterval **) rightElement); + int64 leftShardId = leftInterval->shardId; + int64 rightShardId = rightInterval->shardId; + + /* we compare 64-bit integers, instead of casting their difference to int */ + if (leftShardId > rightShardId) + { + return 1; + } + else if (leftShardId < rightShardId) + { + return -1; + } + else + { + return 0; + } +} + + /* * FindShardInterval finds a single shard interval in the cache for the * given partition column value. diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index ecc6b6c7b..fb4f01a89 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -16,6 +16,8 @@ #include "c.h" #include "libpq-fe.h" +#include "nodes/pg_list.h" +#include "utils/hsearch.h" /* maximum duration to wait for connection */ #define CLIENT_CONNECT_TIMEOUT_SECONDS "5" @@ -57,6 +59,4 @@ extern void PurgeConnection(PGconn *connection); extern void ReportRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); - - #endif /* CONNECTION_CACHE_H */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 136d55f76..427658bde 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -67,7 +67,6 @@ "SELECT master_update_shard_statistics(%ld)" #define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');" - /* Enumeration that defines the shard placement policy to use while staging */ typedef enum { @@ -107,6 +106,7 @@ extern Datum master_create_empty_shard(PG_FUNCTION_ARGS); extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS); extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS); extern Datum master_apply_delete_command(PG_FUNCTION_ARGS); +extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS); extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); /* function declarations for shard creation functionality */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 693c314e4..90bbe0e37 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -31,6 +31,7 @@ #endif extern MultiPlan * MultiRouterPlanCreate(Query *query); +extern void ErrorIfModifyQueryNotSupported(Query *queryTree); extern bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/multi_transaction.h index c4e5c8f2d..7a50eb274 100644 --- a/src/include/distributed/multi_transaction.h +++ b/src/include/distributed/multi_transaction.h @@ -47,6 +47,14 @@ typedef struct TransactionConnection } TransactionConnection; +/* ShardConnections represents a set of connections for each placement of a shard */ +typedef struct ShardConnections +{ + int64 shardId; + List *connectionList; +} ShardConnections; + + /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; @@ -57,6 +65,11 @@ extern void PrepareRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); extern void CloseConnections(List *connectionList); +extern HTAB * CreateShardConnectionHash(void); +extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, + int64 shardId, + bool *shardConnectionsFound); +extern List * ConnectionList(HTAB *connectionHash); #endif /* MULTI_TRANSACTION_H */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index dc374b0ad..92ae6b7fc 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -13,6 +13,7 @@ #include "postgres.h" /* IWYU pragma: keep */ #include "c.h" +#include "nodes/pg_list.h" #include "storage/lock.h" @@ -74,5 +75,6 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode); extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); +extern void LockShards(List *shardIntervalList, LOCKMODE lockMode); #endif /* RESOURCE_LOCK_H */ diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 9d8d1ccf2..1c745f014 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -25,11 +25,11 @@ typedef struct ShardIntervalCompareFunctionCacheEntry extern int CompareShardIntervals(const void *leftElement, const void *rightElement, FmgrInfo *typeCompareFunction); +extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, char partitionMethod, FmgrInfo *compareFunction, FmgrInfo *hashFunction, bool useBinarySearch); - #endif /* SHARDINTERVAL_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 974e4468e..15ae05798 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -13,6 +13,7 @@ CREATE EXTENSION citus VERSION '5.0'; ALTER EXTENSION citus UPDATE TO '5.0-1'; ALTER EXTENSION citus UPDATE TO '5.0-2'; ALTER EXTENSION citus UPDATE TO '5.1-1'; +ALTER EXTENSION citus UPDATE TO '5.1-2'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out new file mode 100644 index 000000000..03a5ec3a8 --- /dev/null +++ b/src/test/regress/expected/multi_shard_modify.out @@ -0,0 +1,261 @@ +-- +-- MULTI_SHARD_MODIFY +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 350000; +-- Create a new hash partitioned multi_shard_modify_test table and stage data into it. +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT master_create_distributed_table('multi_shard_modify_test', 't_key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('multi_shard_modify_test', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv'); +-- Testing master_modify_multiple_shards +-- Verify that master_modify_multiple_shards cannot be called in a transaction block +BEGIN; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); +ERROR: master_modify_multiple_shards cannot run inside a transaction block +ROLLBACK; +-- Check that master_modify_multiple_shards cannot be called with non-distributed tables +CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); +INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); +SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table WHERE col_1 = 37'); +ERROR: relation "temporary_nondistributed_table" is not a distributed table +-- commands with volatile functions in their quals +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +-- commands with stable functions in their quals +CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +-- commands with immutable functions in their quals +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +-- DELETE with expression in WHERE clause +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (3*18-40)'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +-- commands with a USING clause are unsupported +CREATE TEMP TABLE temp_nations(name text, key integer); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +-- commands with a RETURNING clause are unsupported +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *'); +ERROR: cannot perform distributed planning for the given modification +DETAIL: RETURNING clauses are not supported in distributed modifications. +-- commands containing a CTE are unsupported +SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test'); +ERROR: cannot perform distributed planning for the given modification +DETAIL: Common table expressions are not supported in distributed modifications. +-- Check that we can successfully delete from multiple shards with 1PC +SET citus.multi_shard_commit_protocol TO '1pc'; +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 25 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 200'); + master_modify_multiple_shards +------------------------------- + 2 +(1 row) + +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 23 +(1 row) + +-- Check that we can successfully delete from multiple shards with 2PC +SET citus.multi_shard_commit_protocol TO '2pc'; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 100'); + master_modify_multiple_shards +------------------------------- + 2 +(1 row) + +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 21 +(1 row) + +-- Check that shard pruning works +SET client_min_messages TO DEBUG2; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 15'); +DEBUG: predicate pruning for shardId 350001 +DEBUG: predicate pruning for shardId 350002 +DEBUG: predicate pruning for shardId 350003 + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SET client_min_messages TO NOTICE; +-- Check that master_modify_multiple_shards works without partition keys +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_name LIKE ''barce%'' '); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +-- Simple, Single Shard Update +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''warsaw'' WHERE t_key=17'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SELECT t_name FROM multi_shard_modify_test WHERE t_key=17; + t_name +-------- + warsaw +(1 row) + +-- Simple, Multi Shard Update +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''???'' WHERE t_key>30 AND t_key<35'); + master_modify_multiple_shards +------------------------------- + 4 +(1 row) + +SELECT t_name FROM multi_shard_modify_test WHERE t_key>30 AND t_key<35; + t_name +-------- + ??? + ??? + ??? + ??? +(4 rows) + +-- expression UPDATE +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=8*37 WHERE t_key>30 AND t_key<35'); + master_modify_multiple_shards +------------------------------- + 4 +(1 row) + +SELECT t_value FROM multi_shard_modify_test WHERE t_key>30 AND t_key<35; + t_value +--------- + 296 + 296 + 296 + 296 +(4 rows) + +-- multi-column UPDATE +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''somename'', t_value=333 WHERE t_key>30 AND t_key<35'); + master_modify_multiple_shards +------------------------------- + 4 +(1 row) + +SELECT t_name, t_value FROM multi_shard_modify_test WHERE t_key>30 AND t_key<35; + t_name | t_value +----------+--------- + somename | 333 + somename | 333 + somename | 333 + somename | 333 +(4 rows) + +-- commands with no constraints on the partition key are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''nice city'' WHERE t_value < 0'); + master_modify_multiple_shards +------------------------------- + 2 +(1 row) + +SELECT t_name FROM multi_shard_modify_test WHERE t_value < 0; + t_name +----------- + nice city + nice city +(2 rows) + +-- attempting to change the partition key is unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_key=3000 WHERE t_key < 10 '); +ERROR: modifying the partition value of rows is not allowed +-- UPDATEs with a FROM clause are unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL'' FROM temp_nations WHERE multi_shard_modify_test.t_key = 3 AND multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''dummy'' '); +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +-- commands with a RETURNING clause are unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *'); +ERROR: cannot perform distributed planning for the given modification +DETAIL: RETURNING clauses are not supported in distributed modifications. +-- commands containing a CTE are unsupported +SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' '); +ERROR: cannot perform distributed planning for the given modification +DETAIL: Common table expressions are not supported in distributed modifications. +-- updates referencing just a var are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=t_key WHERE t_key = 10'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; + t_value +--------- + 10 +(1 row) + +-- updates referencing a column are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; + t_value +--------- + 47 +(1 row) + +-- updates referencing non-IMMUTABLE functions are unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +-- updates referencing IMMUTABLE functions in SET section are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; + t_value +--------- + 78 +(1 row) + +-- updates referencing STABLE functions in SET section are not supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +-- updates referencing VOLATILE functions in SET section are not supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046; diff --git a/src/test/regress/input/multi_master_delete_protocol.source b/src/test/regress/input/multi_master_delete_protocol.source index b9fca8695..646e0adf8 100644 --- a/src/test/regress/input/multi_master_delete_protocol.source +++ b/src/test/regress/input/multi_master_delete_protocol.source @@ -19,6 +19,7 @@ SELECT master_create_distributed_table('customer_delete_protocol', 'c_custkey', \STAGE customer_delete_protocol FROM '@abs_srcdir@/data/customer.2.data' with delimiter '|' \STAGE customer_delete_protocol FROM '@abs_srcdir@/data/customer.3.data' with delimiter '|' +-- Testing master_apply_delete_command -- Check that we don't support conditions on columns other than partition key. SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 310836ba3..7d0879508 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -71,7 +71,9 @@ test: multi_large_table_task_assignment # ---------- # Tests to check our large record staging and shard deletion behavior # ---------- -test: multi_stage_large_records multi_master_delete_protocol +test: multi_stage_large_records +test: multi_master_delete_protocol +test: multi_shard_modify # ---------- # Tests around DDL statements run on distributed tables diff --git a/src/test/regress/output/multi_master_delete_protocol.source b/src/test/regress/output/multi_master_delete_protocol.source index 3ec22f2c2..a1de0751c 100644 --- a/src/test/regress/output/multi_master_delete_protocol.source +++ b/src/test/regress/output/multi_master_delete_protocol.source @@ -20,6 +20,7 @@ SELECT master_create_distributed_table('customer_delete_protocol', 'c_custkey', \STAGE customer_delete_protocol FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|' \STAGE customer_delete_protocol FROM '@abs_srcdir@/data/customer.2.data' with delimiter '|' \STAGE customer_delete_protocol FROM '@abs_srcdir@/data/customer.3.data' with delimiter '|' +-- Testing master_apply_delete_command -- Check that we don't support conditions on columns other than partition key. SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_acctbal > 0.0'); diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index eec3f22cb..09ef3085c 100644 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -90,6 +90,7 @@ push(@pgOptions, '-c', "listen_addresses='${host}'"); push(@pgOptions, '-c', "unix_socket_directories="); push(@pgOptions, '-c', "fsync=off"); push(@pgOptions, '-c', "shared_preload_libraries=citus"); +push(@pgOptions, '-c', "max_prepared_transactions=100"); # Citus options set for the tests push(@pgOptions, '-c', "citus.shard_max_size=300kB"); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 24d33438f..dc3030a55 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -15,6 +15,7 @@ CREATE EXTENSION citus VERSION '5.0'; ALTER EXTENSION citus UPDATE TO '5.0-1'; ALTER EXTENSION citus UPDATE TO '5.0-2'; ALTER EXTENSION citus UPDATE TO '5.1-1'; +ALTER EXTENSION citus UPDATE TO '5.1-2'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_shard_modify.sql b/src/test/regress/sql/multi_shard_modify.sql new file mode 100644 index 000000000..76c511889 --- /dev/null +++ b/src/test/regress/sql/multi_shard_modify.sql @@ -0,0 +1,153 @@ +-- +-- MULTI_SHARD_MODIFY +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 350000; + +-- Create a new hash partitioned multi_shard_modify_test table and stage data into it. +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT master_create_distributed_table('multi_shard_modify_test', 't_key', 'hash'); +SELECT master_create_worker_shards('multi_shard_modify_test', 4, 2); + +COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv'); +1,san francisco,99 +2,istanbul,34 +3,paris,46 +4,london,91 +5,toronto,98 +6,london,44 +7,stockholm,21 +8,tallinn,33 +9,helsinki,21 +10,ankara,6 +11,karabuk,78 +12,kastamonu,37 +13,samsun,55 +14,rome,13 +15,madrid,1 +16,barcelona,8 +17,poznan,12 +31,kabul,4 +32,dhaka,62 +33,iamey,121 +34,muscat,77 +41,uppsala,-1 +42,malmo,-2 +101,tokyo,106 +102,new delhi,978 +201,taipei,556 +202,beijing,754 +\. + +-- Testing master_modify_multiple_shards +-- Verify that master_modify_multiple_shards cannot be called in a transaction block +BEGIN; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); +ROLLBACK; + +-- Check that master_modify_multiple_shards cannot be called with non-distributed tables +CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); +INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); +SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table WHERE col_1 = 37'); + +-- commands with volatile functions in their quals +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); + +-- commands with stable functions in their quals +CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); + +-- commands with immutable functions in their quals +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); + +-- DELETE with expression in WHERE clause +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (3*18-40)'); + +-- commands with a USING clause are unsupported +CREATE TEMP TABLE temp_nations(name text, key integer); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); + +-- commands with a RETURNING clause are unsupported +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *'); + +-- commands containing a CTE are unsupported +SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test'); + +-- Check that we can successfully delete from multiple shards with 1PC +SET citus.multi_shard_commit_protocol TO '1pc'; +SELECT count(*) FROM multi_shard_modify_test; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 200'); +SELECT count(*) FROM multi_shard_modify_test; + +-- Check that we can successfully delete from multiple shards with 2PC +SET citus.multi_shard_commit_protocol TO '2pc'; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 100'); +SELECT count(*) FROM multi_shard_modify_test; + +-- Check that shard pruning works +SET client_min_messages TO DEBUG2; +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 15'); +SET client_min_messages TO NOTICE; + +-- Check that master_modify_multiple_shards works without partition keys +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_name LIKE ''barce%'' '); + + +-- Simple, Single Shard Update +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''warsaw'' WHERE t_key=17'); +SELECT t_name FROM multi_shard_modify_test WHERE t_key=17; + +-- Simple, Multi Shard Update +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''???'' WHERE t_key>30 AND t_key<35'); +SELECT t_name FROM multi_shard_modify_test WHERE t_key>30 AND t_key<35; + +-- expression UPDATE +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=8*37 WHERE t_key>30 AND t_key<35'); +SELECT t_value FROM multi_shard_modify_test WHERE t_key>30 AND t_key<35; + +-- multi-column UPDATE +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''somename'', t_value=333 WHERE t_key>30 AND t_key<35'); +SELECT t_name, t_value FROM multi_shard_modify_test WHERE t_key>30 AND t_key<35; + +-- commands with no constraints on the partition key are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''nice city'' WHERE t_value < 0'); +SELECT t_name FROM multi_shard_modify_test WHERE t_value < 0; + +-- attempting to change the partition key is unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_key=3000 WHERE t_key < 10 '); + +-- UPDATEs with a FROM clause are unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL'' FROM temp_nations WHERE multi_shard_modify_test.t_key = 3 AND multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''dummy'' '); + +-- commands with a RETURNING clause are unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *'); + +-- commands containing a CTE are unsupported +SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' '); + +-- updates referencing just a var are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=t_key WHERE t_key = 10'); +SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; + +-- updates referencing a column are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10'); +SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; + +-- updates referencing non-IMMUTABLE functions are unsupported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); + +-- updates referencing IMMUTABLE functions in SET section are supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); +SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; + +-- updates referencing STABLE functions in SET section are not supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); + +-- updates referencing VOLATILE functions in SET section are not supported +SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;