From 7d7f9eef7a1900f0c4c3e2d3057112acfe374ddb Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 2 Apr 2020 16:11:14 -0700 Subject: [PATCH] Feedback by pykello --- .../distributed/metadata/node_metadata.c | 13 +++++ .../transaction/transaction_management.c | 1 + .../distributed/utils/reference_table_utils.c | 55 +++++++++++++++++-- .../distributed/transaction_management.h | 3 + 4 files changed, 67 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 798dad789..4cc9f5d17 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -60,6 +60,9 @@ int GroupSize = 1; /* config variable managed via guc.c */ char *CurrentCluster = "default"; +/* did current transaction modify pg_dist_node? */ +bool TransactionModifiedNodeMetadata = false; + typedef struct NodeMetadata { int32 groupId; @@ -158,6 +161,7 @@ master_add_node(PG_FUNCTION_ARGS) int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); + TransactionModifiedNodeMetadata = true; /* * After adding new node, if the node did not already exist, we will activate @@ -196,6 +200,7 @@ master_add_inactive_node(PG_FUNCTION_ARGS) int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); + TransactionModifiedNodeMetadata = true; PG_RETURN_INT32(nodeId); } @@ -229,6 +234,7 @@ master_add_secondary_node(PG_FUNCTION_ARGS) int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); + TransactionModifiedNodeMetadata = true; PG_RETURN_INT32(nodeId); } @@ -252,6 +258,7 @@ master_remove_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort); + TransactionModifiedNodeMetadata = true; PG_RETURN_VOID(); } @@ -305,6 +312,7 @@ master_disable_node(PG_FUNCTION_ARGS) } SetNodeState(nodeName, nodePort, isActive); + TransactionModifiedNodeMetadata = true; } PG_CATCH(); { @@ -351,6 +359,7 @@ master_set_node_property(PG_FUNCTION_ARGS) ))); } + TransactionModifiedNodeMetadata = true; PG_RETURN_VOID(); } @@ -450,6 +459,8 @@ master_activate_node(PG_FUNCTION_ARGS) nodePort); ActivateNode(workerNode->workerName, workerNode->workerPort); + TransactionModifiedNodeMetadata = true; + PG_RETURN_INT32(workerNode->nodeId); } @@ -721,6 +732,8 @@ master_update_node(PG_FUNCTION_ARGS) TerminateBackgroundWorker(handle); } + TransactionModifiedNodeMetadata = true; + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index f28a43571..803851ff2 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -449,6 +449,7 @@ ResetGlobalVariables() dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; + TransactionModifiedNodeMetadata = false; } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8430a9e76..ea5265e00 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -24,6 +24,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" +#include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -47,6 +48,7 @@ static void ReplicateShardToAllNodes(ShardInterval *shardInterval); static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); +static bool AnyRelationsModifiedInTransaction(List *relationIdList); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -117,7 +119,7 @@ EnsureReferenceTablesExistOnAllNodes(void) ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; - /* prevent this funcion from running concurrently with itself */ + /* prevent this function from running concurrently with itself */ int colocationId = TableColocationId(referenceTableId); LockColocationId(colocationId, ExclusiveLock); @@ -129,7 +131,29 @@ EnsureReferenceTablesExistOnAllNodes(void) return; } - /* TODO: ensure reference tables have not been modified in this transaction */ + /* + * master_copy_shard_placement triggers metadata sync-up, which tries to + * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement + * in a separate connection. If we have modified pg_dist_node in the + * current backend, this will cause a deadlock. + */ + if (TransactionModifiedNodeMetadata) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified node metadata"))); + } + + /* + * Modifications to reference tables in current transaction are not visible + * to master_copy_shard_placement, since it is done in a separate backend. + */ + if (AnyRelationsModifiedInTransaction(referenceTableIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified a reference table"))); + } bool missingOk = false; ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); @@ -155,6 +179,28 @@ EnsureReferenceTablesExistOnAllNodes(void) } +/* + * AnyRelationsModifiedInTransaction returns true if any of the given relations + * were modified in the current transaction. + */ +static bool +AnyRelationsModifiedInTransaction(List *relationIdList) +{ + Oid relationId = InvalidOid; + + foreach_oid(relationId, relationIdList) + { + if (GetRelationDDLAccessMode(relationId) != RELATION_NOT_ACCESSED || + GetRelationDMLAccessMode(relationId) != RELATION_NOT_ACCESSED) + { + return true; + } + } + + return false; +} + + /* * WorkersWithoutReferenceTablePlacement returns a list of workers (WorkerNode) that * do not yet have a placement for the given reference table shard ID, but are @@ -176,10 +222,8 @@ WorkersWithoutReferenceTablePlacement(uint64 shardId) { char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; - bool missingWorkerOk = true; ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, - nodeName, nodePort, - missingWorkerOk); + nodeName, nodePort); if (targetPlacement == NULL) { workersWithoutPlacements = lappend(workersWithoutPlacements, workerNode); @@ -277,6 +321,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) "reference tables.", relationName))); } + EnsureReferenceTablesExistOnAllNodes(); ReplicateSingleShardTableToAllNodes(relationId); PG_RETURN_VOID(); diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 43c1e4d02..317e5df69 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -103,6 +103,9 @@ extern int DoBlockLevel; /* SET LOCAL statements active in the current (sub-)transaction. */ extern StringInfo activeSetStmts; +/* did current transaction modify pg_dist_node? */ +extern bool TransactionModifiedNodeMetadata; + /* * Coordinated transaction management. */