Partition drop first

tx_metadata_sync
Onder Kalaci 2022-11-29 09:07:32 +01:00
parent da98c2a92f
commit e875774e97
2 changed files with 73 additions and 28 deletions

View File

@ -811,12 +811,6 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode)
*/
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
commandList = lappend(commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
/* first remove partitioned tables to avoid any need for detaching partitions */
commandList = lappend(commandList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
commandList = lappend(commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
/*
* Replicate all objects of the pg_dist_object to the remote node.
*/
@ -1127,7 +1121,8 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
return NULL;
}
static void
DropExistingMetadataInOutsideTransaction(List *nodeList);
/*
* ActivateNodeList iterates over the nodeList and activates the nodes.
@ -1174,6 +1169,34 @@ ActivateNodeList(List *nodeList)
/* both nodes should be the same */
Assert(workerNode->nodeId == node->nodeId);
workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
{
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode);
}
}
/*
* Before we do any remote commands for syncing the metadata,
* we can first drop any existing metadata, just in case we are
* re-syncing the metadata to a node.
*
* Note that we send these commands in an outside transaction
* because in certain cases drop commands generate excessive
* amount of relcache invalidation events, and sending those
* commands in the main transaction that we sync the metadata
* becomes problematic.
*/
DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeToSyncMetadata)
{
/*
* Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive
@ -1190,29 +1213,19 @@ ActivateNodeList(List *nodeList)
localOnly);
}
workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
{
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode);
}
}
/*
@ -1249,6 +1262,36 @@ ActivateNodeList(List *nodeList)
}
static void
DropExistingMetadataInOutsideTransaction(List *nodeList)
{
List *connectionList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert (superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
ExecuteRemoteCommandInConnectionList(connectionList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_ALL_SHELL_TABLES_COMMAND);
MultiConnection *connection;
foreach_ptr(connection, connectionList)
CloseConnection(connection);
}
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables and setting isactive column of the

View File

@ -189,6 +189,8 @@ WorkerDropDistributedTable(Oid relationId)
PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL);
}
}
#include "access/xact.h"
/*