diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 69a9fc3c9..05bcf8432 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2101,12 +2101,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->distributedRelation = distributedRelation; copyDest->tupleDescriptor = inputTupleDescriptor; - /* we don't support copy to reference tables from workers */ - if (partitionMethod == DISTRIBUTE_BY_NONE) - { - EnsureCoordinator(); - } - /* load the list of shards and verify that we have shards to copy into */ shardIntervalList = LoadShardIntervalList(tableId); if (shardIntervalList == NIL) @@ -2147,7 +2141,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, * Prevent concurrent UPDATE/DELETE on replication factor >1 * (see AcquireExecutorMultiShardLocks() at multi_router_executor.c) */ - LockShardListResources(shardIntervalList, RowExclusiveLock); + SerializeNonCommutativeWrites(shardIntervalList, RowExclusiveLock); /* keep the table metadata to avoid looking it up for every tuple */ copyDest->tableMetadata = cacheEntry; diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d95df4a6d..572556097 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -32,7 +32,9 @@ #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" @@ -248,7 +250,9 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) if (shardId != INVALID_SHARD_ID && lockMode != NoLock) { - LockShardResource(shardId, lockMode); + ShardInterval *shardInterval = LoadShardInterval(shardId); + + SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); } /* @@ -282,8 +286,6 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) { List *shardIntervalList = LoadShardIntervalList(relationId); - ShardInterval *referenceTableShardInterval = (ShardInterval *) linitial( - shardIntervalList); if (rowLockStrength == LCS_FORKEYSHARE || rowLockStrength == LCS_FORSHARE) { @@ -295,7 +297,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) rowLockMode = ExclusiveLock; } - LockShardResource(referenceTableShardInterval->shardId, rowLockMode); + SerializeNonCommutativeWrites(shardIntervalList, rowLockMode); } } } diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 40ecce5a4..666505e61 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -24,6 +24,7 @@ #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -123,6 +124,54 @@ master_move_shard_placement(PG_FUNCTION_ARGS) } +/* + * BlockWritesToShardList blocks writes to all shards in the given shard + * list. The function assumes that all the shards in the list are colocated. + */ +void +BlockWritesToShardList(List *shardList) +{ + ListCell *shardCell = NULL; + + bool shouldSyncMetadata = false; + ShardInterval *firstShardInterval = NULL; + Oid firstDistributedTableId = InvalidOid; + + foreach(shardCell, shardList) + { + ShardInterval *shard = (ShardInterval *) lfirst(shardCell); + + /* + * We need to lock the referenced reference table metadata to avoid + * asynchronous shard copy in case of cascading DML operations. + */ + LockReferencedReferenceShardDistributionMetadata(shard->shardId, + ExclusiveLock); + + LockShardDistributionMetadata(shard->shardId, ExclusiveLock); + } + + /* following code relies on the list to have at least one shard */ + if (list_length(shardList) == 0) + { + return; + } + + /* + * Since the function assumes that the input shards are colocated, + * calculating shouldSyncMetadata for a single table is sufficient. + */ + firstShardInterval = (ShardInterval *) linitial(shardList); + firstDistributedTableId = firstShardInterval->relationId; + + shouldSyncMetadata = ShouldSyncTableMetadata(firstDistributedTableId); + if (shouldSyncMetadata) + { + LockShardListMetadataOnWorkers(ExclusiveLock, shardList); + } +} + + /* * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 5ae91eeb4..4adde5852 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -183,6 +183,31 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) } +/* + * ClusterHasKnownMetadataWorkers returns true if the node executing the function + * knows at least one worker with metadata. We do it + * (a) by checking the node that executes the function is a worker with metadata + * (b) the coordinator knows at least one worker with metadata. + */ +bool +ClusterHasKnownMetadataWorkers() +{ + bool workerWithMetadata = false; + + if (GetLocalGroupId() != 0) + { + workerWithMetadata = true; + } + + if (workerWithMetadata || HasMetadataWorkers()) + { + return true; + } + + return false; +} + + /* * ShouldSyncTableMetadata checks if the metadata of a distributed table should be * propagated to metadata workers, i.e. the table is an MX table or reference table. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index b43f5ea4f..4269a2e24 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -526,7 +526,6 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - bool isCoordinator = IsCoordinator(); List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; uint32 queryTableCount = 0; @@ -586,12 +585,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - bool referenceTable = false; if (rangeTableEntry->rtekind == RTE_RELATION) { - DistTableCacheEntry *distTableEntry = NULL; - if (!IsDistributedTable(rangeTableEntry->relid)) { StringInfo errorMessage = makeStringInfo(); @@ -604,22 +600,6 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer errorMessage->data, NULL, NULL); } - distTableEntry = DistributedTableCacheEntry(rangeTableEntry->relid); - if (distTableEntry->partitionMethod == DISTRIBUTE_BY_NONE) - { - referenceTable = true; - } - - if (referenceTable && !isCoordinator) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed planning for the given" - " modification", - "Modifications to reference tables are " - "supported only from the coordinator.", - NULL); - } - queryTableCount++; /* we do not expect to see a view in modify query */ @@ -2843,12 +2823,6 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC uint32 tableReplicationFactor = TableShardReplicationFactor( distributedTableId); - if (partitionMethod == DISTRIBUTE_BY_NONE) - { - EnsureCoordinator(); - } - - if (tableReplicationFactor > 1 && partitionMethod != DISTRIBUTE_BY_NONE) { return false; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a695437cd..e1f3224cb 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -20,6 +20,7 @@ #include "access/xact.h" #include "distributed/connection_management.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/resource_lock.h" @@ -55,6 +56,30 @@ SendCommandToWorker(char *nodeName, int32 nodePort, char *command) } +/* + * SendCommandToFirstWorker sends the given command only to the first worker node + * sorted by host name and port number using SendCommandToWorker. + */ +void +SendCommandToFirstWorker(char *command) +{ + List *workerNodeList = ActivePrimaryNodeList(); + WorkerNode *firstWorkerNode = NULL; + + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + if (list_length(workerNodeList) == 0) + { + ereport(ERROR, (errmsg("cannot find a worker node"))); + } + + firstWorkerNode = (WorkerNode *) linitial(workerNodeList); + + SendCommandToWorker(firstWorkerNode->workerName, firstWorkerNode->workerPort, + command); +} + + /* * SendCommandToWorkers sends a command to all workers in * parallel. Commands are committed on the workers when the local diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 0a1d62cd8..99a94ba19 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -128,7 +128,9 @@ void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) { List *referenceTableList = ReferenceTableOidList(); + List *referenceShardIntervalList = NIL; ListCell *referenceTableCell = NULL; + ListCell *referenceShardIntervalCell = NULL; List *workerNodeList = ActivePrimaryNodeList(); uint32 workerCount = 0; Oid firstReferenceTableId = InvalidOid; @@ -150,6 +152,20 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) Oid referenceTableId = lfirst_oid(referenceTableCell); List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + + referenceShardIntervalList = lappend(referenceShardIntervalList, + shardInterval); + } + + if (ClusterHasKnownMetadataWorkers()) + { + BlockWritesToShardList(referenceShardIntervalList); + } + + foreach(referenceShardIntervalCell, referenceShardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst( + referenceShardIntervalCell); uint64 shardId = shardInterval->shardId; LockShardDistributionMetadata(shardId, ExclusiveLock); @@ -394,6 +410,7 @@ void DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId) { List *referenceTableList = ReferenceTableOidList(); + List *referenceShardIntervalList = NIL; ListCell *referenceTableCell = NULL; /* if there are no reference tables, we do not need to do anything */ @@ -407,6 +424,13 @@ DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId) * DeleteAllReferenceTablePlacementsFromNodeGroup calls. */ referenceTableList = SortList(referenceTableList, CompareOids); + if (ClusterHasKnownMetadataWorkers()) + { + referenceShardIntervalList = GetSortedReferenceShardIntervals(referenceTableList); + + BlockWritesToShardList(referenceShardIntervalList); + } + foreach(referenceTableCell, referenceTableList) { GroupShardPlacement *placement = NULL; diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 06fd61a99..aa6dc2018 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -20,7 +20,9 @@ #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/distributed_planner.h" #include "distributed/multi_router_executor.h" @@ -34,7 +36,10 @@ /* local function forward declarations */ static LOCKMODE IntToLockMode(int mode); -static List * GetSortedReferenceShardIntervals(List *relationList); +static void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); +static void LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, + List *shardIntervalList); +static bool IsFirstWorkerNode(); /* exports for SQL callable functions */ @@ -83,7 +88,7 @@ lock_shard_metadata(PG_FUNCTION_ARGS) /* - * lock_shard_resources allows shard resources to be locked + * lock_shard_resources allows shard resources to be locked * remotely to serialise non-commutative writes on shards. * * This function does not sort the array to avoid deadlock, callers @@ -122,6 +127,111 @@ lock_shard_resources(PG_FUNCTION_ARGS) } +/* + * LockShardListResourcesOnFirstWorker acquires the resource locks for the specified + * shards on the first worker. Acquiring a lock with or without metadata does not + * matter for us. So, worker does not have to be an MX node, acquiring the lock + * on any worker node is enough. Note that the function does not sort the shard list, + * therefore the caller should sort the shard list in order to avoid deadlocks. + */ +static void +LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) +{ + StringInfo lockCommand = makeStringInfo(); + ListCell *shardIntervalCell = NULL; + int processedShardIntervalCount = 0; + int totalShardIntervalCount = list_length(shardIntervalList); + + appendStringInfo(lockCommand, "SELECT lock_shard_resources(%d, ARRAY[", lockmode); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; + + appendStringInfo(lockCommand, "%lu", shardId); + + processedShardIntervalCount++; + if (processedShardIntervalCount != totalShardIntervalCount) + { + appendStringInfo(lockCommand, ", "); + } + } + + appendStringInfo(lockCommand, "])"); + + SendCommandToFirstWorker(lockCommand->data); +} + + +/* + * IsFirstWorkerNode checks whether the node is the first worker node sorted + * according to the host name and port number. + */ +static bool +IsFirstWorkerNode() +{ + List *workerNodeList = ActivePrimaryNodeList(); + WorkerNode *firstWorkerNode = NULL; + + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + if (list_length(workerNodeList) == 0) + { + return false; + } + + firstWorkerNode = (WorkerNode *) linitial(workerNodeList); + + if (firstWorkerNode->groupId == GetLocalGroupId()) + { + return true; + } + + return false; +} + + +/* + * LockShardListMetadataOnWorkers acquires the matadata locks for the specified shards on + * metadata workers. Note that the function does not sort the shard list, therefore the + * caller should sort the shard list in order to avoid deadlocks. + */ +void +LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList) +{ + StringInfo lockCommand = makeStringInfo(); + ListCell *shardIntervalCell = NULL; + int processedShardIntervalCount = 0; + int totalShardIntervalCount = list_length(shardIntervalList); + + if (list_length(shardIntervalList) == 0) + { + return; + } + + appendStringInfo(lockCommand, "SELECT lock_shard_metadata(%d, ARRAY[", lockmode); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; + + appendStringInfo(lockCommand, "%lu", shardId); + + processedShardIntervalCount++; + if (processedShardIntervalCount != totalShardIntervalCount) + { + appendStringInfo(lockCommand, ", "); + } + } + + appendStringInfo(lockCommand, "])"); + + SendCommandToWorkers(WORKERS_WITH_METADATA, lockCommand->data); +} + + /* * IntToLockMode verifies whether the specified integer is an accepted lock mode * and returns it as a LOCKMODE enum. @@ -141,6 +251,10 @@ IntToLockMode(int mode) { return AccessShareLock; } + else if (mode == RowExclusiveLock) + { + return RowExclusiveLock; + } else { elog(ERROR, "unsupported lockmode %d", mode); @@ -169,6 +283,9 @@ LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) /* * LockReferencedReferenceShardDistributionMetadata acquires the given lock * on the reference tables which has a foreign key from the given relation. + * + * It also gets metadata locks on worker nodes to prevent concurrent write + * operations on reference tables from metadata nodes. */ void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE lock) @@ -178,8 +295,13 @@ LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE lock) DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey; - List *shardIntervalList = GetSortedReferenceShardIntervals(referencedRelationList); + + if (list_length(shardIntervalList) > 0 && ClusterHasKnownMetadataWorkers()) + { + LockShardListMetadataOnWorkers(lock, shardIntervalList); + } + foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); @@ -193,7 +315,7 @@ LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE lock) * GetSortedReferenceShards iterates through the given relation list. * Lists the shards of reference tables and returns the list after sorting. */ -static List * +List * GetSortedReferenceShardIntervals(List *relationList) { List *shardIntervalList = NIL; @@ -353,11 +475,43 @@ LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode) } +/* + * SerializeNonCommutativeWrites acquires the required locks to prevent concurrent + * writes on the given shards. + * + * If the modified shard is a reference table's shard and the cluster is an MX + * cluster we need to get shard resource lock on the first worker node to + * prevent divergence possibility between placements of the reference table. + * + * In other workers, by acquiring a lock on the first worker, we're serializing + * non-commutative modifications to a reference table. If the node executing the + * command is the first worker, defined via IsFirstWorker(), we skip acquiring + * the lock remotely to avoid an extra round-trip and/or self-deadlocks. + * + * Finally, if we're not dealing with reference tables on MX cluster, we'll + * always acquire the lock with LockShardListResources() call. + */ +void +SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode) +{ + ShardInterval *firstShardInterval = (ShardInterval *) linitial(shardIntervalList); + int64 firstShardId = firstShardInterval->shardId; + + if (ReferenceTableShardId(firstShardId) && ClusterHasKnownMetadataWorkers() && + !IsFirstWorkerNode()) + { + LockShardListResourcesOnFirstWorker(lockMode, shardIntervalList); + } + + LockShardListResources(shardIntervalList, lockMode); +} + + /* * LockShardListResources takes locks on all shards in shardIntervalList to * prevent concurrent DML statements on those shards. */ -void +static void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 5a3b09c46..41b346ae9 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -18,6 +18,7 @@ /* Functions declarations for metadata syncing */ +extern bool ClusterHasKnownMetadataWorkers(void); extern bool ShouldSyncTableMetadata(Oid relationId); extern List * MetadataCreateCommands(void); extern List * GetDistributedTableDDLEvents(Oid relationId); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index a83519088..4d78966c0 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -13,6 +13,7 @@ #include "postgres.h" /* IWYU pragma: keep */ #include "c.h" +#include "distributed/worker_transaction.h" #include "nodes/pg_list.h" #include "storage/lock.h" @@ -66,6 +67,8 @@ typedef enum AdvisoryLocktagClass /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); +extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList); +extern void BlockWritesToShardList(List *shardList); /* Lock shard/relation metadata of the referenced reference table if exists */ extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE @@ -83,8 +86,9 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode); -extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); +extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); +extern List * GetSortedReferenceShardIntervals(List *relationList); /* Lock partitions of partitioned table */ extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a63f12a78..4c76f77ac 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -29,6 +29,7 @@ typedef enum TargetWorkerSet /* Functions declarations for worker transactions */ extern List * GetWorkerTransactions(void); extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command); +extern void SendCommandToFirstWorker(char *command); extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command); extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList); diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index 7eff736b1..3cdccb5f2 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -2,6 +2,7 @@ -- MULTI_MX_CREATE_TABLE -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ----------------------------- @@ -474,20 +475,20 @@ ORDER BY colocationid, logicalrelid; mx_ddl_table | 3 | 4 | h | s app_analytics_events_mx | 3 | 4 | h | s company_employees_mx | 3 | 4 | h | s - nation_hash | 4 | 16 | h | s - citus_mx_test_schema.nation_hash | 4 | 16 | h | s - lineitem_mx | 5 | 16 | h | s - orders_mx | 5 | 16 | h | s - customer_mx | 6 | 1 | n | t - nation_mx | 6 | 1 | n | t - part_mx | 6 | 1 | n | t - supplier_mx | 6 | 1 | n | t - limit_orders_mx | 7 | 2 | h | s - articles_hash_mx | 7 | 2 | h | s - multiple_hash_mx | 8 | 2 | h | s - researchers_mx | 9 | 2 | h | s - labs_mx | 10 | 1 | h | s - objects_mx | 10 | 1 | h | s - articles_single_shard_hash_mx | 10 | 1 | h | s + customer_mx | 4 | 1 | n | t + nation_mx | 4 | 1 | n | t + part_mx | 4 | 1 | n | t + supplier_mx | 4 | 1 | n | t + nation_hash | 1390000 | 16 | h | s + citus_mx_test_schema.nation_hash | 1390000 | 16 | h | s + lineitem_mx | 1390001 | 16 | h | s + orders_mx | 1390001 | 16 | h | s + limit_orders_mx | 1390002 | 2 | h | s + articles_hash_mx | 1390002 | 2 | h | s + multiple_hash_mx | 1390003 | 2 | h | s + researchers_mx | 1390004 | 2 | h | s + labs_mx | 1390005 | 1 | h | s + objects_mx | 1390005 | 1 | h | s + articles_single_shard_hash_mx | 1390005 | 1 | h | s (23 rows) diff --git a/src/test/regress/expected/multi_mx_modifications_to_reference_tables.out b/src/test/regress/expected/multi_mx_modifications_to_reference_tables.out new file mode 100644 index 000000000..b8febf421 --- /dev/null +++ b/src/test/regress/expected/multi_mx_modifications_to_reference_tables.out @@ -0,0 +1,159 @@ +SET citus.next_shard_id TO 100400; +-- ================================================================= +-- test modification functionality on reference tables from MX nodes +-- ================================================================= +CREATE SCHEMA mx_modify_reference_table; +SET search_path TO 'mx_modify_reference_table'; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +CREATE TABlE ref_table(id int, value_1 int); +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABlE ref_table_2(id int, value_1 int); +SELECT create_reference_table('ref_table_2'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE test_table_1(id int, value_1 int); +SELECT create_distributed_table('test_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_1 VALUES(5,5),(6,6); +\c - - - :worker_1_port +SET search_path TO 'mx_modify_reference_table'; +-- Simple DML operations from the first worker node +INSERT INTO ref_table VALUES(1,1),(2,2); +SELECT SUM(value_1) FROM ref_table; + sum +----- + 3 +(1 row) + +UPDATE ref_table SET value_1 = 1 WHERE id = 2; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 2 +(1 row) + +DELETE FROM ref_table; +SELECT SUM(value_1) FROM ref_table; + sum +----- + +(1 row) + +COPY ref_table FROM STDIN DELIMITER ','; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 3 +(1 row) + +-- Select For Update also follows the same logic with modification. +-- It has been started to be supported on MX nodes with DML operations. +SELECT * FROM ref_table FOR UPDATE; + id | value_1 +----+--------- + 1 | 1 + 2 | 2 +(2 rows) + +-- Both distributed and non-distributed INSERT INTO ... SELECT +-- queries are also supported on MX nodes. +INSERT INTO ref_table SELECT * FROM test_table_1; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 14 +(1 row) + +INSERT INTO ref_table_2 SELECT * FROM ref_table; +SELECT SUM(value_1) FROM ref_table_2; + sum +----- + 14 +(1 row) + +-- Now connect to the second worker and observe the results as well +\c - - - :worker_2_port +SET search_path TO 'mx_modify_reference_table'; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 14 +(1 row) + +SELECT SUM(value_1) FROM ref_table_2; + sum +----- + 14 +(1 row) + +-- Run basic queries from second worker node. These tests have been added +-- since locking logic is slightly different between running these commands +-- from first worker node and the second one +INSERT INTO ref_table VALUES(1,1),(2,2); +SELECT SUM(value_1) FROM ref_table; + sum +----- + 17 +(1 row) + +UPDATE ref_table SET value_1 = 1 WHERE id = 2; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 15 +(1 row) + +COPY ref_table FROM STDIN DELIMITER ','; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 18 +(1 row) + +INSERT INTO ref_table SELECT * FROM test_table_1; +SELECT SUM(value_1) FROM ref_table; + sum +----- + 29 +(1 row) + +INSERT INTO ref_table_2 SELECT * FROM ref_table; +SELECT SUM(value_1) FROM ref_table_2; + sum +----- + 43 +(1 row) + +\c - - - :master_port +SET search_path TO 'public'; +DROP SCHEMA mx_modify_reference_table CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table mx_modify_reference_table.ref_table +drop cascades to table mx_modify_reference_table.ref_table_2 +drop cascades to table mx_modify_reference_table.test_table_1 diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 0ba5f7c75..9ef20e5ff 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -931,11 +931,6 @@ SELECT * 41 | 1 | aznavour | 11814 (5 rows) --- SELECT ... FOR UPDATE does not supported from MX nodes if it contains --- reference table. -SELECT * FROM customer_mx FOR UPDATE; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -- not router plannable due to or SELECT * FROM articles_hash_mx diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 5a0b86c46..d46760e8a 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -108,35 +108,6 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; 5 (1 row) --- INSERT/UPDATE/DELETE/COPY on reference tables -SELECT * FROM mx_ref_table ORDER BY col_1; - col_1 | col_2 --------+-------- - -78 | sapien - -37 | morbi - -34 | augue -(3 rows) - -INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Modifications to reference tables are supported only from the coordinator. -UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Modifications to reference tables are supported only from the coordinator. -DELETE FROM mx_ref_table WHERE col_1 = -78; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Modifications to reference tables are supported only from the coordinator. -COPY mx_ref_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -SELECT * FROM mx_ref_table ORDER BY col_1; - col_1 | col_2 --------+-------- - -78 | sapien - -37 | morbi - -34 | augue -(3 rows) - \c - - - :master_port DROP TABLE mx_ref_table; CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 0484a27ee..3186c2ed4 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -18,8 +18,9 @@ test: multi_cluster_management test: multi_test_helpers # the following test has to be run sequentially -test: multi_mx_hide_shard_names -test: multi_mx_partitioning +test: multi_mx_hide_shard_names +test: multi_mx_modifications_to_reference_tables +test: multi_mx_partitioning test: multi_mx_create_table test: multi_mx_copy_data multi_mx_router_planner test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 diff --git a/src/test/regress/sql/multi_mx_create_table.sql b/src/test/regress/sql/multi_mx_create_table.sql index f7ecac215..33e085fbf 100644 --- a/src/test/regress/sql/multi_mx_create_table.sql +++ b/src/test/regress/sql/multi_mx_create_table.sql @@ -3,6 +3,7 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/multi_mx_modifications_to_reference_tables.sql b/src/test/regress/sql/multi_mx_modifications_to_reference_tables.sql new file mode 100644 index 000000000..470a9e084 --- /dev/null +++ b/src/test/regress/sql/multi_mx_modifications_to_reference_tables.sql @@ -0,0 +1,91 @@ + + +SET citus.next_shard_id TO 100400; + +-- ================================================================= +-- test modification functionality on reference tables from MX nodes +-- ================================================================= + +CREATE SCHEMA mx_modify_reference_table; +SET search_path TO 'mx_modify_reference_table'; + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; + +SET citus.replication_model TO 'streaming'; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + +CREATE TABlE ref_table(id int, value_1 int); +SELECT create_reference_table('ref_table'); + +CREATE TABlE ref_table_2(id int, value_1 int); +SELECT create_reference_table('ref_table_2'); + +CREATE TABLE test_table_1(id int, value_1 int); +SELECT create_distributed_table('test_table_1', 'id'); +INSERT INTO test_table_1 VALUES(5,5),(6,6); + +\c - - - :worker_1_port +SET search_path TO 'mx_modify_reference_table'; + +-- Simple DML operations from the first worker node +INSERT INTO ref_table VALUES(1,1),(2,2); +SELECT SUM(value_1) FROM ref_table; + +UPDATE ref_table SET value_1 = 1 WHERE id = 2; +SELECT SUM(value_1) FROM ref_table; + +DELETE FROM ref_table; +SELECT SUM(value_1) FROM ref_table; + +COPY ref_table FROM STDIN DELIMITER ','; +1,1 +2,2 +\. +SELECT SUM(value_1) FROM ref_table; + +-- Select For Update also follows the same logic with modification. +-- It has been started to be supported on MX nodes with DML operations. +SELECT * FROM ref_table FOR UPDATE; + +-- Both distributed and non-distributed INSERT INTO ... SELECT +-- queries are also supported on MX nodes. +INSERT INTO ref_table SELECT * FROM test_table_1; +SELECT SUM(value_1) FROM ref_table; + +INSERT INTO ref_table_2 SELECT * FROM ref_table; +SELECT SUM(value_1) FROM ref_table_2; + +-- Now connect to the second worker and observe the results as well +\c - - - :worker_2_port +SET search_path TO 'mx_modify_reference_table'; + +SELECT SUM(value_1) FROM ref_table; +SELECT SUM(value_1) FROM ref_table_2; + +-- Run basic queries from second worker node. These tests have been added +-- since locking logic is slightly different between running these commands +-- from first worker node and the second one +INSERT INTO ref_table VALUES(1,1),(2,2); +SELECT SUM(value_1) FROM ref_table; + +UPDATE ref_table SET value_1 = 1 WHERE id = 2; +SELECT SUM(value_1) FROM ref_table; + +COPY ref_table FROM STDIN DELIMITER ','; +1,1 +2,2 +\. +SELECT SUM(value_1) FROM ref_table; + +INSERT INTO ref_table SELECT * FROM test_table_1; +SELECT SUM(value_1) FROM ref_table; + +INSERT INTO ref_table_2 SELECT * FROM ref_table; +SELECT SUM(value_1) FROM ref_table_2; + +\c - - - :master_port + +SET search_path TO 'public'; +DROP SCHEMA mx_modify_reference_table CASCADE; diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index 8da234cc1..366f78092 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -407,10 +407,6 @@ SELECT * FROM articles_hash_mx WHERE author_id = (random()::int * 0 + 1); --- SELECT ... FOR UPDATE does not supported from MX nodes if it contains --- reference table. -SELECT * FROM customer_mx FOR UPDATE; - -- not router plannable due to or SELECT * FROM articles_hash_mx diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 3b604ac0a..83ffc1808 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -79,15 +79,6 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; - --- INSERT/UPDATE/DELETE/COPY on reference tables -SELECT * FROM mx_ref_table ORDER BY col_1; -INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum'); -UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37; -DELETE FROM mx_ref_table WHERE col_1 = -78; -COPY mx_ref_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); -SELECT * FROM mx_ref_table ORDER BY col_1; - \c - - - :master_port DROP TABLE mx_ref_table; CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);