mirror of https://github.com/citusdata/citus.git
Synchronize hasmetadata flag on mx workers (#5086)
* Synchronize hasmetadata flag on mx workers * Switch to sequential execution * Add test * Use SetWorkerColumn * Add test for stop_sync * Remove usage of UpdateHasmetadataOnWorkersWithMetadata * Remove MarkNodeMetadataSynced * Fix test for metadatasynced * Remove MarkNodeMetadataSynced * Style * Remove MarkNodeHasMetadata * Remove UpdateDistNodeBoolAttr * Refactor SetWorkerColumn * Use SetWorkerColumnLocalOnly when setting up dependencies * Use SetWorkerColumnLocalOnly in TriggerSyncMetadataToPrimaryNodes * Style * Make update command generator functions static * Set metadatasynced before syncing * Call SetWorkerColumn only if the sync is successful * Try to sync all nodes * Fix indexno * Update metadatasynced locally first * Break if a node fails to sync metadata * Send worker commands optional * Style & Rebase * Add raiseOnError param to SetWorkerColumn * Style * Set metadatasynced for all metadata nodes * Style * Introduce SetWorkerColumnOptional * Polish * Style * Dont send set command to not synced metadata nodes * Style * Polish * Add test for stop_sync * Add test for shouldhaveshards * Add test for isactive flag * Sort by placementid in the function verify_metadata * Cover edge cases for failing nodes * Add comments * Add nodeport to isactive test * Add warning if metadata out of sync * Update warning messagepull/5175/head
parent
e5b32b2c3c
commit
9e90894f21
|
@ -44,6 +44,7 @@
|
|||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/namespace_utils.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
@ -1109,7 +1110,8 @@ TriggerSyncMetadataToPrimaryNodes(void)
|
|||
* this because otherwise node activation might fail withing transaction blocks.
|
||||
*/
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
MarkNodeHasMetadata(workerNode->workerName, workerNode->workerPort, true);
|
||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
|
||||
BoolGetDatum(true));
|
||||
|
||||
triggerMetadataSync = true;
|
||||
}
|
||||
|
|
|
@ -83,8 +83,6 @@ char *EnableManualMetadataChangesForUser = "";
|
|||
static void EnsureSequentialModeMetadataOperations(void);
|
||||
static List * GetDistributedTableDDLEvents(Oid relationId);
|
||||
static char * LocalGroupIdUpdateCommand(int32 groupId);
|
||||
static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort,
|
||||
int attrNum, bool value);
|
||||
static List * SequenceDependencyCommandList(Oid relationId);
|
||||
static char * TruncateTriggerCreateCommand(Oid relationId);
|
||||
static char * SchemaOwnerName(Oid objectId);
|
||||
|
@ -170,9 +168,6 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
|
|||
{
|
||||
char *escapedNodeName = quote_literal_cstr(nodeNameString);
|
||||
|
||||
/* fail if metadata synchronization doesn't succeed */
|
||||
bool raiseInterrupts = true;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
EnsureSuperUser();
|
||||
|
@ -209,7 +204,21 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
|
|||
}
|
||||
|
||||
UseCoordinatedTransaction();
|
||||
MarkNodeHasMetadata(nodeNameString, nodePort, true);
|
||||
|
||||
/*
|
||||
* One would normally expect to set hasmetadata first, and then metadata sync.
|
||||
* However, at this point we do the order reverse.
|
||||
* We first set metadatasynced, and then hasmetadata; since setting columns for
|
||||
* nodes with metadatasynced==false could cause errors.
|
||||
* (See ErrorIfAnyMetadataNodeOutOfSync)
|
||||
* We can safely do that because we are in a coordinated transaction and the changes
|
||||
* are only visible to our own transaction.
|
||||
* If anything goes wrong, we are going to rollback all the changes.
|
||||
*/
|
||||
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
|
||||
BoolGetDatum(true));
|
||||
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
|
||||
true));
|
||||
|
||||
if (!NodeIsPrimary(workerNode))
|
||||
{
|
||||
|
@ -220,8 +229,9 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
|
|||
return;
|
||||
}
|
||||
|
||||
/* fail if metadata synchronization doesn't succeed */
|
||||
bool raiseInterrupts = true;
|
||||
SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
|
||||
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
|
||||
}
|
||||
|
||||
|
||||
|
@ -303,9 +313,6 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
MarkNodeHasMetadata(nodeNameString, nodePort, false);
|
||||
MarkNodeMetadataSynced(nodeNameString, nodePort, false);
|
||||
|
||||
if (clearMetadata)
|
||||
{
|
||||
if (NodeIsPrimary(workerNode))
|
||||
|
@ -326,6 +333,11 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
}
|
||||
}
|
||||
|
||||
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
|
||||
false));
|
||||
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
|
||||
BoolGetDatum(false));
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -1115,83 +1127,6 @@ LocalGroupIdUpdateCommand(int32 groupId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in
|
||||
* pg_dist_node to hasMetadata.
|
||||
*/
|
||||
void
|
||||
MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata)
|
||||
{
|
||||
UpdateDistNodeBoolAttr(nodeName, nodePort,
|
||||
Anum_pg_dist_node_hasmetadata,
|
||||
hasMetadata);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkNodeMetadataSynced function sets the metadatasynced column of the
|
||||
* specified worker in pg_dist_node to the given value.
|
||||
*/
|
||||
void
|
||||
MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced)
|
||||
{
|
||||
UpdateDistNodeBoolAttr(nodeName, nodePort,
|
||||
Anum_pg_dist_node_metadatasynced,
|
||||
synced);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateDistNodeBoolAttr updates a boolean attribute of the specified worker
|
||||
* to the given value.
|
||||
*/
|
||||
static void
|
||||
UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, int attrNum, bool value)
|
||||
{
|
||||
const bool indexOK = false;
|
||||
|
||||
ScanKeyData scanKey[2];
|
||||
Datum values[Natts_pg_dist_node];
|
||||
bool isnull[Natts_pg_dist_node];
|
||||
bool replace[Natts_pg_dist_node];
|
||||
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
||||
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
||||
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
|
||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
|
||||
NULL, 2, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[attrNum - 1] = BoolGetDatum(value);
|
||||
isnull[attrNum - 1] = false;
|
||||
replace[attrNum - 1] = true;
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||
|
||||
CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
|
||||
|
||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistNode, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SequenceDDLCommandsForTable returns a list of commands which create sequences (and
|
||||
* their schemas) to run on workers before creating the relation. The sequence creation
|
||||
|
@ -1840,6 +1775,7 @@ SyncMetadataToNodes(void)
|
|||
return METADATA_SYNC_FAILED_LOCK;
|
||||
}
|
||||
|
||||
List *syncedWorkerList = NIL;
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
|
@ -1847,7 +1783,6 @@ SyncMetadataToNodes(void)
|
|||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
bool raiseInterrupts = false;
|
||||
|
||||
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||
{
|
||||
ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
|
||||
|
@ -1857,12 +1792,27 @@ SyncMetadataToNodes(void)
|
|||
}
|
||||
else
|
||||
{
|
||||
MarkNodeMetadataSynced(workerNode->workerName,
|
||||
workerNode->workerPort, true);
|
||||
/* we add successfully synced nodes to set metadatasynced column later */
|
||||
syncedWorkerList = lappend(syncedWorkerList, workerNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach_ptr(workerNode, syncedWorkerList)
|
||||
{
|
||||
SetWorkerColumnOptional(workerNode, Anum_pg_dist_node_metadatasynced,
|
||||
BoolGetDatum(true));
|
||||
|
||||
/* we fetch the same node again to check if it's synced or not */
|
||||
WorkerNode *nodeUpdated = FindWorkerNode(workerNode->workerName,
|
||||
workerNode->workerPort);
|
||||
if (!nodeUpdated->metadataSynced)
|
||||
{
|
||||
/* set the result to FAILED to trigger the sync again */
|
||||
result = METADATA_SYNC_FAILED_SYNC;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -109,6 +109,11 @@ static bool NodeIsLocal(WorkerNode *worker);
|
|||
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
||||
static bool UnsetMetadataSyncedForAll(void);
|
||||
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
|
||||
int columnIndex,
|
||||
Datum value);
|
||||
static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata);
|
||||
static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced);
|
||||
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
|
||||
char *field);
|
||||
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
|
||||
|
@ -579,8 +584,8 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
|
|||
*/
|
||||
if (ClusterHasDistributedFunctionWithDistArgument())
|
||||
{
|
||||
MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort,
|
||||
true);
|
||||
SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata,
|
||||
BoolGetDatum(true));
|
||||
TriggerMetadataSyncOnCommit();
|
||||
}
|
||||
}
|
||||
|
@ -1554,11 +1559,85 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
|
|||
|
||||
/*
|
||||
* SetWorkerColumn function sets the column with the specified index
|
||||
* on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
|
||||
* It also sends the same command for node update to other metadata nodes.
|
||||
* If anything fails during the transaction, we rollback it.
|
||||
* Returns the new worker node after the modification.
|
||||
*/
|
||||
WorkerNode *
|
||||
SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||
{
|
||||
workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
|
||||
|
||||
char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
|
||||
columnIndex,
|
||||
value);
|
||||
|
||||
SendCommandToWorkersWithMetadata(metadataSyncCommand);
|
||||
|
||||
return workerNode;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetWorkerColumnOptional function sets the column with the specified index
|
||||
* on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
|
||||
* It also sends the same command optionally for node update to other metadata nodes,
|
||||
* meaning that failures are ignored. Returns the new worker node after the modification.
|
||||
*/
|
||||
WorkerNode *
|
||||
SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||
{
|
||||
char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
|
||||
columnIndex,
|
||||
value);
|
||||
|
||||
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
|
||||
ShareLock);
|
||||
|
||||
/* open connections in parallel */
|
||||
WorkerNode *worker = NULL;
|
||||
foreach_ptr(worker, workerNodeList)
|
||||
{
|
||||
bool success = SendOptionalCommandListToWorkerInCoordinatedTransaction(
|
||||
worker->workerName, worker->workerPort,
|
||||
CurrentUserName(),
|
||||
list_make1(metadataSyncCommand));
|
||||
|
||||
if (!success)
|
||||
{
|
||||
/* metadata out of sync, mark the worker as not synced */
|
||||
ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d "
|
||||
"is failed on node %s:%d."
|
||||
"Metadata on %s:%d is marked as out of sync.",
|
||||
workerNode->workerName, workerNode->workerPort,
|
||||
worker->workerName, worker->workerPort,
|
||||
worker->workerName, worker->workerPort)));
|
||||
|
||||
SetWorkerColumnLocalOnly(worker, Anum_pg_dist_node_metadatasynced,
|
||||
BoolGetDatum(false));
|
||||
}
|
||||
else if (workerNode->nodeId == worker->nodeId)
|
||||
{
|
||||
/*
|
||||
* If this is the node we want to update and it is updated succesfully,
|
||||
* then we can safely update the flag on the coordinator as well.
|
||||
*/
|
||||
SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
|
||||
}
|
||||
}
|
||||
|
||||
return FindWorkerNode(workerNode->workerName, workerNode->workerPort);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetWorkerColumnLocalOnly function sets the column with the specified index
|
||||
* (see pg_dist_node.h) on the worker in pg_dist_node.
|
||||
* It returns the new worker node after the modification.
|
||||
*/
|
||||
static WorkerNode *
|
||||
SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||
WorkerNode *
|
||||
SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||
{
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
|
@ -1567,47 +1646,6 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
|
|||
Datum values[Natts_pg_dist_node];
|
||||
bool isnull[Natts_pg_dist_node];
|
||||
bool replace[Natts_pg_dist_node];
|
||||
char *metadataSyncCommand = NULL;
|
||||
|
||||
|
||||
switch (columnIndex)
|
||||
{
|
||||
case Anum_pg_dist_node_hasmetadata:
|
||||
{
|
||||
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case Anum_pg_dist_node_isactive:
|
||||
{
|
||||
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");
|
||||
|
||||
metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId,
|
||||
DatumGetBool(value));
|
||||
break;
|
||||
}
|
||||
|
||||
case Anum_pg_dist_node_shouldhaveshards:
|
||||
{
|
||||
metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId,
|
||||
DatumGetBool(value));
|
||||
break;
|
||||
}
|
||||
|
||||
case Anum_pg_dist_node_metadatasynced:
|
||||
{
|
||||
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||
workerNode->workerName, workerNode->workerPort)));
|
||||
}
|
||||
}
|
||||
|
||||
if (heapTuple == NULL)
|
||||
{
|
||||
|
@ -1631,12 +1669,99 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
|
|||
|
||||
table_close(pgDistNode, NoLock);
|
||||
|
||||
/* we also update the column at worker nodes */
|
||||
SendCommandToWorkersWithMetadata(metadataSyncCommand);
|
||||
return newWorkerNode;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is
|
||||
* valid or not. Then it returns the necessary metadata sync command as a string.
|
||||
*/
|
||||
static char *
|
||||
GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum
|
||||
value)
|
||||
{
|
||||
char *metadataSyncCommand = NULL;
|
||||
|
||||
switch (columnIndex)
|
||||
{
|
||||
case Anum_pg_dist_node_hasmetadata:
|
||||
{
|
||||
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
|
||||
metadataSyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId,
|
||||
DatumGetBool(value));
|
||||
break;
|
||||
}
|
||||
|
||||
case Anum_pg_dist_node_isactive:
|
||||
{
|
||||
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");
|
||||
|
||||
metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId,
|
||||
DatumGetBool(value));
|
||||
break;
|
||||
}
|
||||
|
||||
case Anum_pg_dist_node_shouldhaveshards:
|
||||
{
|
||||
metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId,
|
||||
DatumGetBool(value));
|
||||
break;
|
||||
}
|
||||
|
||||
case Anum_pg_dist_node_metadatasynced:
|
||||
{
|
||||
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
|
||||
metadataSyncCommand = NodeMetadataSyncedUpdateCommand(workerNode->nodeId,
|
||||
DatumGetBool(value));
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||
workerNode->workerName, workerNode->workerPort)));
|
||||
}
|
||||
}
|
||||
|
||||
return metadataSyncCommand;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeHasmetadataUpdateCommand generates and returns a SQL UPDATE command
|
||||
* that updates the hasmetada column of pg_dist_node, for the given nodeid.
|
||||
*/
|
||||
static char *
|
||||
NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata)
|
||||
{
|
||||
StringInfo updateCommand = makeStringInfo();
|
||||
char *hasMetadataString = hasMetadata ? "TRUE" : "FALSE";
|
||||
appendStringInfo(updateCommand,
|
||||
"UPDATE pg_dist_node SET hasmetadata = %s "
|
||||
"WHERE nodeid = %u",
|
||||
hasMetadataString, nodeId);
|
||||
return updateCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeMetadataSyncedUpdateCommand generates and returns a SQL UPDATE command
|
||||
* that updates the metadataSynced column of pg_dist_node, for the given nodeid.
|
||||
*/
|
||||
static char *
|
||||
NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced)
|
||||
{
|
||||
StringInfo updateCommand = makeStringInfo();
|
||||
char *hasMetadataString = metadataSynced ? "TRUE" : "FALSE";
|
||||
appendStringInfo(updateCommand,
|
||||
"UPDATE pg_dist_node SET metadatasynced = %s "
|
||||
"WHERE nodeid = %u",
|
||||
hasMetadataString, nodeId);
|
||||
return updateCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfCoordinatorMetadataSetFalse throws an error if the input node
|
||||
* is the coordinator and the value is false.
|
||||
|
@ -1655,28 +1780,28 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi
|
|||
|
||||
/*
|
||||
* SetShouldHaveShards function sets the shouldhaveshards column of the
|
||||
* specified worker in pg_dist_node.
|
||||
* specified worker in pg_dist_node. also propagates this to other metadata nodes.
|
||||
* It returns the new worker node after the modification.
|
||||
*/
|
||||
static WorkerNode *
|
||||
SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards)
|
||||
{
|
||||
return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards,
|
||||
BoolGetDatum(shouldHaveShards));
|
||||
return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards, BoolGetDatum(
|
||||
shouldHaveShards));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetNodeState function sets the isactive column of the specified worker in
|
||||
* pg_dist_node to isActive.
|
||||
* pg_dist_node to isActive. Also propagates this to other metadata nodes.
|
||||
* It returns the new worker node after the modification.
|
||||
*/
|
||||
static WorkerNode *
|
||||
SetNodeState(char *nodeName, int nodePort, bool isActive)
|
||||
{
|
||||
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
||||
return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive,
|
||||
BoolGetDatum(isActive));
|
||||
return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(
|
||||
isActive));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -48,8 +48,6 @@ extern List * GrantOnSchemaDDLCommands(Oid schemaId);
|
|||
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
|
||||
uint64 shardLength, int32 groupId);
|
||||
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
|
||||
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
|
||||
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
|
||||
extern void SyncMetadataToNodesMain(Datum main_arg);
|
||||
extern void SignalMetadataSyncDaemon(Oid database, int sig);
|
||||
|
|
|
@ -96,6 +96,11 @@ extern bool NodeIsPrimary(WorkerNode *worker);
|
|||
extern bool NodeIsSecondary(WorkerNode *worker);
|
||||
extern bool NodeIsReadable(WorkerNode *worker);
|
||||
extern bool NodeIsCoordinator(WorkerNode *node);
|
||||
extern WorkerNode * SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value);
|
||||
extern WorkerNode * SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum
|
||||
value);
|
||||
extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex,
|
||||
Datum value);
|
||||
extern uint32 CountPrimariesWithMetadata(void);
|
||||
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
|
||||
|
||||
|
|
|
@ -248,7 +248,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2000 WHERE measureid = 3;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2000_2580005 sensors_2000
|
||||
-> Bitmap Heap Scan on sensors_2000_2580005 sensors_xxx
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2000_pkey_2580005
|
||||
Index Cond: (measureid = 3)
|
||||
|
@ -263,7 +263,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2001 WHERE measureid = 3;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2001_2580009 sensors_2001
|
||||
-> Bitmap Heap Scan on sensors_2001_2580009 sensors_xxx
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2001_pkey_2580009
|
||||
Index Cond: (measureid = 3)
|
||||
|
@ -278,7 +278,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2002 WHERE measureid = 3;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2002_2580013 sensors_2002
|
||||
-> Bitmap Heap Scan on sensors_2002_2580013 sensors_xxx
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2002_pkey_2580013
|
||||
Index Cond: (measureid = 3)
|
||||
|
@ -293,7 +293,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2003 WHERE measureid = 3;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2003_2580017 sensors_2003
|
||||
-> Bitmap Heap Scan on sensors_2003_2580017 sensors_xxx
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2003_pkey_2580017
|
||||
Index Cond: (measureid = 3)
|
||||
|
|
|
@ -266,7 +266,7 @@ SELECT * FROM pg_dist_local_group;
|
|||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | localhost | 57637 | default | t | t | primary | default | f | t
|
||||
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
|
||||
2 | 2 | localhost | 57638 | default | f | t | primary | default | f | t
|
||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f | t
|
||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t
|
||||
|
|
|
@ -26,7 +26,7 @@ WITH dist_node_summary AS (
|
|||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||
false)
|
||||
), dist_placement_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY placementid) FROM pg_dist_placement' AS query
|
||||
), dist_placement_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||
|
|
|
@ -271,6 +271,148 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel
|
|||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- test synchronization for pg_dist_node flags
|
||||
SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||
citus_set_node_property
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
|
||||
citus_set_node_property
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
hasmetadata | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
t | t | f
|
||||
t | t | f
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
hasmetadata | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
t | t | f
|
||||
t | t | f
|
||||
(2 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
|
||||
citus_set_node_property
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
NOTICE: dropping metadata on the node (localhost,57637)
|
||||
stop_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
|
||||
citus_set_node_property
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
hasmetadata | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
hasmetadata | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
f | f | t
|
||||
t | t | t
|
||||
(2 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
NOTICE: dropping metadata on the node (localhost,57638)
|
||||
stop_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
hasmetadata | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
hasmetadata | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- verify that mx workers are updated when disabling/activating nodes
|
||||
SELECT citus_disable_node('localhost', :worker_1_port);
|
||||
NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57637) to activate this node back.
|
||||
citus_disable_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
nodeport | isactive
|
||||
---------------------------------------------------------------------
|
||||
57637 | f
|
||||
57638 | t
|
||||
(2 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT citus_activate_node('localhost', :worker_1_port);
|
||||
citus_activate_node
|
||||
---------------------------------------------------------------------
|
||||
17
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
nodeport | isactive
|
||||
---------------------------------------------------------------------
|
||||
57637 | t
|
||||
57638 | t
|
||||
(2 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
NOTICE: dropping metadata on the node (localhost,57638)
|
||||
stop_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET search_path TO "start_stop_metadata_sync";
|
||||
-- both start & stop metadata sync operations can be transactional
|
||||
BEGIN;
|
||||
|
|
|
@ -23,7 +23,7 @@ WITH dist_node_summary AS (
|
|||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||
false)
|
||||
), dist_placement_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY placementid) FROM pg_dist_placement' AS query
|
||||
), dist_placement_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||
|
|
|
@ -120,6 +120,55 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND r
|
|||
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
|
||||
|
||||
\c - - - :master_port
|
||||
-- test synchronization for pg_dist_node flags
|
||||
SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
|
||||
\c - - - :worker_1_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :master_port
|
||||
-- verify that mx workers are updated when disabling/activating nodes
|
||||
SELECT citus_disable_node('localhost', :worker_1_port);
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT citus_activate_node('localhost', :worker_1_port);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport;
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
SET search_path TO "start_stop_metadata_sync";
|
||||
|
||||
-- both start & stop metadata sync operations can be transactional
|
||||
|
|
Loading…
Reference in New Issue