mirror of https://github.com/citusdata/citus.git
Sync metadata in new clusters
If a user upgrades from an ealier version, and have not synced the metadata, we should not automatically sync the metadata. If this is a post Citus 11 cluster, and we are adding a new worker node, we sync the metadata. The net effect is that new clusters gets Citus MX by default, existing clusters should manually trigger metadata syncing.sync_metadata_only_needed
parent
80f41e94c0
commit
ee472e789e
|
|
@ -91,6 +91,7 @@ typedef struct NodeMetadata
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static int ActivateNode(char *nodeName, int nodePort);
|
static int ActivateNode(char *nodeName, int nodePort);
|
||||||
|
static bool ShouldSyncMetadataToNewNode(WorkerNode *newNode);
|
||||||
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
||||||
static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode);
|
static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode);
|
||||||
static bool PlacementHasActivePlacementOnAnotherGroup(GroupShardPlacement
|
static bool PlacementHasActivePlacementOnAnotherGroup(GroupShardPlacement
|
||||||
|
|
@ -883,8 +884,7 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
workerNode =
|
workerNode =
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
||||||
BoolGetDatum(isActive));
|
BoolGetDatum(isActive));
|
||||||
bool syncMetadata =
|
bool syncMetadata = ShouldSyncMetadataToNewNode(workerNode);
|
||||||
EnableMetadataSyncByDefault && NodeIsPrimary(workerNode);
|
|
||||||
|
|
||||||
if (syncMetadata)
|
if (syncMetadata)
|
||||||
{
|
{
|
||||||
|
|
@ -911,6 +911,35 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldSyncMetadataToNewNode decides to sync metadata to new node only if
|
||||||
|
* there is at least one healthy primary worker node with metadata synced.
|
||||||
|
*
|
||||||
|
* This is to prevent upgrades from pre Citus-11 to post Citus-11 to automatically
|
||||||
|
* convert the cluster to a metadata synced cluster. However, if the cluster is
|
||||||
|
* post Citus-11 (or had 0 workers on pre Citus-11), we'd sync the metadata.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ShouldSyncMetadataToNewNode(WorkerNode *newNode)
|
||||||
|
{
|
||||||
|
uint32 primaryWorkerCount = ActivePrimaryNonCoordinatorNodeCount();
|
||||||
|
uint32 primariesWithMetadata = CountPrimaryWorkersWithMetadata();
|
||||||
|
|
||||||
|
if (primaryWorkerCount != 0 && primariesWithMetadata == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This cluster doesnt't have any worker nodes with
|
||||||
|
* metadata synced. In these cases, we refrain from
|
||||||
|
* syncing the metadata.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* otherwise, we check if the user explicitly disabled metadata syncing */
|
||||||
|
return EnableMetadataSyncByDefault && NodeIsPrimary(newNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* citus_update_node moves the requested node to a different nodename and nodeport. It
|
* citus_update_node moves the requested node to a different nodename and nodeport. It
|
||||||
* locks to ensure no queries are running concurrently; and is intended for customers who
|
* locks to ensure no queries are running concurrently; and is intended for customers who
|
||||||
|
|
@ -1472,9 +1501,9 @@ RemoveOldShardPlacementForNodeGroup(int groupId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
|
/* CountPrimaryWorkersWithMetadata returns the number of primary nodes which have metadata. */
|
||||||
uint32
|
uint32
|
||||||
CountPrimariesWithMetadata(void)
|
CountPrimaryWorkersWithMetadata(void)
|
||||||
{
|
{
|
||||||
uint32 primariesWithMetadata = 0;
|
uint32 primariesWithMetadata = 0;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
@ -1648,7 +1677,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
|
||||||
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
|
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
|
||||||
|
|
||||||
/* finally prepare the insert command and send it to all primary nodes */
|
/* finally prepare the insert command and send it to all primary nodes */
|
||||||
uint32 primariesWithMetadata = CountPrimariesWithMetadata();
|
uint32 primariesWithMetadata = CountPrimaryWorkersWithMetadata();
|
||||||
if (primariesWithMetadata != 0)
|
if (primariesWithMetadata != 0)
|
||||||
{
|
{
|
||||||
List *workerNodeList = list_make1(workerNode);
|
List *workerNodeList = list_make1(workerNode);
|
||||||
|
|
|
||||||
|
|
@ -101,7 +101,7 @@ extern WorkerNode * SetWorkerColumnOptional(WorkerNode *workerNode, int columnIn
|
||||||
value);
|
value);
|
||||||
extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex,
|
extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex,
|
||||||
Datum value);
|
Datum value);
|
||||||
extern uint32 CountPrimariesWithMetadata(void);
|
extern uint32 CountPrimaryWorkersWithMetadata(void);
|
||||||
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
|
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
|
||||||
|
|
||||||
/* Function declarations for worker node utilities */
|
/* Function declarations for worker node utilities */
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue