Feedback by pykello

guc_for_replicate_on_activate
Hadi Moshayedi 2020-04-02 16:11:14 -07:00
parent bf6b12e351
commit 7d7f9eef7a
4 changed files with 67 additions and 5 deletions

View File

@ -60,6 +60,9 @@ int GroupSize = 1;
/* config variable managed via guc.c */
char *CurrentCluster = "default";
/* did current transaction modify pg_dist_node? */
bool TransactionModifiedNodeMetadata = false;
typedef struct NodeMetadata
{
int32 groupId;
@ -158,6 +161,7 @@ master_add_node(PG_FUNCTION_ARGS)
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
/*
* After adding new node, if the node did not already exist, we will activate
@ -196,6 +200,7 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(nodeId);
}
@ -229,6 +234,7 @@ master_add_secondary_node(PG_FUNCTION_ARGS)
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(nodeId);
}
@ -252,6 +258,7 @@ master_remove_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID();
}
@ -305,6 +312,7 @@ master_disable_node(PG_FUNCTION_ARGS)
}
SetNodeState(nodeName, nodePort, isActive);
TransactionModifiedNodeMetadata = true;
}
PG_CATCH();
{
@ -351,6 +359,7 @@ master_set_node_property(PG_FUNCTION_ARGS)
)));
}
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID();
}
@ -450,6 +459,8 @@ master_activate_node(PG_FUNCTION_ARGS)
nodePort);
ActivateNode(workerNode->workerName, workerNode->workerPort);
TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(workerNode->nodeId);
}
@ -721,6 +732,8 @@ master_update_node(PG_FUNCTION_ARGS)
TerminateBackgroundWorker(handle);
}
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID();
}

View File

@ -449,6 +449,7 @@ ResetGlobalVariables()
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
TransactionModifiedNodeMetadata = false;
}

View File

@ -24,6 +24,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
@ -47,6 +48,7 @@ static void ReplicateShardToAllNodes(ShardInterval *shardInterval);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
int nodePort);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
@ -117,7 +119,7 @@ EnsureReferenceTablesExistOnAllNodes(void)
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
/* prevent this funcion from running concurrently with itself */
/* prevent this function from running concurrently with itself */
int colocationId = TableColocationId(referenceTableId);
LockColocationId(colocationId, ExclusiveLock);
@ -129,7 +131,29 @@ EnsureReferenceTablesExistOnAllNodes(void)
return;
}
/* TODO: ensure reference tables have not been modified in this transaction */
/*
* master_copy_shard_placement triggers metadata sync-up, which tries to
* acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
* in a separate connection. If we have modified pg_dist_node in the
* current backend, this will cause a deadlock.
*/
if (TransactionModifiedNodeMetadata)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified node metadata")));
}
/*
* Modifications to reference tables in current transaction are not visible
* to master_copy_shard_placement, since it is done in a separate backend.
*/
if (AnyRelationsModifiedInTransaction(referenceTableIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified a reference table")));
}
bool missingOk = false;
ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
@ -155,6 +179,28 @@ EnsureReferenceTablesExistOnAllNodes(void)
}
/*
* AnyRelationsModifiedInTransaction returns true if any of the given relations
* were modified in the current transaction.
*/
static bool
AnyRelationsModifiedInTransaction(List *relationIdList)
{
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
if (GetRelationDDLAccessMode(relationId) != RELATION_NOT_ACCESSED ||
GetRelationDMLAccessMode(relationId) != RELATION_NOT_ACCESSED)
{
return true;
}
}
return false;
}
/*
* WorkersWithoutReferenceTablePlacement returns a list of workers (WorkerNode) that
* do not yet have a placement for the given reference table shard ID, but are
@ -176,10 +222,8 @@ WorkersWithoutReferenceTablePlacement(uint64 shardId)
{
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
bool missingWorkerOk = true;
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort,
missingWorkerOk);
nodeName, nodePort);
if (targetPlacement == NULL)
{
workersWithoutPlacements = lappend(workersWithoutPlacements, workerNode);
@ -277,6 +321,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
"reference tables.", relationName)));
}
EnsureReferenceTablesExistOnAllNodes();
ReplicateSingleShardTableToAllNodes(relationId);
PG_RETURN_VOID();

View File

@ -103,6 +103,9 @@ extern int DoBlockLevel;
/* SET LOCAL statements active in the current (sub-)transaction. */
extern StringInfo activeSetStmts;
/* did current transaction modify pg_dist_node? */
extern bool TransactionModifiedNodeMetadata;
/*
* Coordinated transaction management.
*/