Merge pull request #2928 from citusdata/master_update_node

Propagate metadata for master_update_node
pull/2985/head
Hadi Moshayedi 2019-09-18 09:40:28 -07:00 committed by GitHub
commit 09d4efadcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1306 additions and 136 deletions

View File

@ -249,6 +249,35 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co
}
/*
* StartWorkerListConnections starts connections to the given worker list and
* returns them as a MultiConnection list.
*/
List *
StartWorkerListConnections(List *workerNodeList, uint32 flags, const char *user,
const char *database)
{
List *connectionList = NIL;
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
MultiConnection *connection = NULL;
int connectionFlags = 0;
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
user, database);
connectionList = lappend(connectionList, connection);
}
return connectionList;
}
/*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
*

View File

@ -443,9 +443,9 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
/*
* store result if result has been set, when the user is not interested in the result
* a NULL pointer could be passed and the result will be cleared
* a NULL pointer could be passed and the result will be cleared.
*/
if (result)
if (result != NULL)
{
*result = localResult;
}
@ -454,6 +454,7 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
PQclear(localResult);
ForgetResults(connection);
}
return RESPONSE_OKAY;
}

View File

@ -38,11 +38,13 @@
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_node.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "distributed/version_compat.h"
#include "foreign/foreign.h"
#include "nodes/pg_list.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
@ -52,6 +54,8 @@
static char * LocalGroupIdUpdateCommand(int32 groupId);
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum,
bool value);
static List * SequenceDDLCommandsForTable(Oid relationId);
static void EnsureSupportedSequenceColumnType(Oid sequenceOid);
static Oid TypeOfColumn(Oid tableId, int16 columnId);
@ -59,18 +63,15 @@ static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
static List * DetachPartitionCommandList(void);
static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
/*
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting queries. The function first sets the localGroupId of the worker
* so that the worker knows which tuple in pg_dist_node table represents itself. After
* that, SQL statements for re-creating metadata of MX-eligible distributed tables are
* sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node
* is marked as true.
* start_metadata_sync_to_node function sets hasmetadata column of the given
* node to true, and then synchronizes the metadata on the node.
*/
Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)
@ -78,14 +79,12 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
char *extensionOwner = CitusExtensionOwnerName();
char *escapedNodeName = quote_literal_cstr(nodeNameString);
WorkerNode *workerNode = NULL;
char *localGroupIdUpdateCommand = NULL;
List *recreateMetadataSnapshotCommandList = NIL;
List *dropMetadataCommandList = NIL;
List *createMetadataCommandList = NIL;
/* fail if metadata synchronization doesn't succeed */
bool raiseInterrupts = true;
EnsureCoordinator();
EnsureSuperUser();
@ -94,6 +93,8 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
PreventInTransactionBlock(true, "start_metadata_sync_to_node");
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
@ -123,31 +124,8 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/* generate and add the local group id's update query */
localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
/* generate the queries which drop the metadata */
dropMetadataCommandList = MetadataDropCommands();
/* generate the queries which create the metadata from scratch */
createMetadataCommandList = MetadataCreateCommands();
recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList,
localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
dropMetadataCommandList);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
/*
* Send the snapshot recreation commands in a single remote transaction and
* error out in any kind of failure. Note that it is not required to send
* createMetadataSnapshotCommandList in the same transaction that we send
* nodeDeleteCommand and nodeInsertCommand commands below.
*/
EnsureNoModificationsHaveBeenDone();
SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner,
recreateMetadataSnapshotCommandList);
SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
PG_RETURN_VOID();
}
@ -170,6 +148,8 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
EnsureSuperUser();
CheckCitusVersion(ERROR);
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
@ -178,6 +158,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
}
MarkNodeHasMetadata(nodeNameString, nodePort, false);
MarkNodeMetadataSynced(nodeNameString, nodePort, false);
PG_RETURN_VOID();
}
@ -237,6 +218,105 @@ ShouldSyncTableMetadata(Oid relationId)
}
/*
* SyncMetadataSnapshotToNode does the following:
* 1. Sets the localGroupId on the worker so the worker knows which tuple in
* pg_dist_node represents itself.
* 2. Recreates the distributed metadata on the given worker.
* If raiseOnError is true, it errors out if synchronization fails.
*/
static bool
SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
{
char *extensionOwner = CitusExtensionOwnerName();
/* generate and add the local group id's update query */
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
/* generate the queries which drop the metadata */
List *dropMetadataCommandList = MetadataDropCommands();
/* generate the queries which create the metadata from scratch */
List *createMetadataCommandList = MetadataCreateCommands();
List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
dropMetadataCommandList);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
/*
* Send the snapshot recreation commands in a single remote transaction and
* if requested, error out in any kind of failure. Note that it is not
* required to send createMetadataSnapshotCommandList in the same transaction
* that we send nodeDeleteCommand and nodeInsertCommand commands below.
*/
if (raiseOnError)
{
SendCommandListToWorkerInSingleTransaction(workerNode->workerName,
workerNode->workerPort,
extensionOwner,
recreateMetadataSnapshotCommandList);
return true;
}
else
{
bool success =
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName,
workerNode->workerPort,
extensionOwner,
recreateMetadataSnapshotCommandList);
return success;
}
}
/*
* SendOptionalCommandListToWorkerInTransaction sends the given command list to
* the given worker in a single transaction. If any of the commands fail, it
* rollbacks the transaction, and otherwise commits.
*/
bool
SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
char *nodeUser, List *commandList)
{
MultiConnection *workerConnection = NULL;
ListCell *commandCell = NULL;
int connectionFlags = FORCE_NEW_CONNECTION;
bool failed = false;
workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
nodeUser, NULL);
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
{
failed = true;
break;
}
}
if (failed)
{
RemoteTransactionAbort(workerConnection);
}
else
{
RemoteTransactionCommit(workerConnection);
}
CloseConnection(workerConnection);
return !failed;
}
/*
* MetadataCreateCommands returns list of queries that are
* required to create the current metadata snapshot of the node that the
@ -500,13 +580,14 @@ NodeListInsertCommand(List *workerNodeList)
/* generate the query without any values yet */
appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
"noderack, hasmetadata, isactive, noderole, nodecluster) VALUES ");
"noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES ");
/* iterate over the worker nodes, add the values */
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
@ -514,13 +595,14 @@ NodeListInsertCommand(List *workerNodeList)
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, %s, %s, %s, '%s'::noderole, %s)",
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s)",
workerNode->nodeId,
workerNode->groupId,
quote_literal_cstr(workerNode->workerName),
workerNode->workerPort,
quote_literal_cstr(workerNode->workerRack),
hasMetadataString,
metadataSyncedString,
isActiveString,
nodeRoleString,
quote_literal_cstr(workerNode->nodeCluster));
@ -863,10 +945,36 @@ LocalGroupIdUpdateCommand(int32 groupId)
/*
* MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in
* pg_dist_node to true.
* pg_dist_node to hasMetadata.
*/
static void
MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
{
UpdateDistNodeBoolAttr(nodeName, nodePort,
Anum_pg_dist_node_hasmetadata,
hasMetadata);
}
/*
* MarkNodeMetadataSynced function sets the metadatasynced column of the
* specified worker in pg_dist_node to the given value.
*/
void
MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced)
{
UpdateDistNodeBoolAttr(nodeName, nodePort,
Anum_pg_dist_node_metadatasynced,
synced);
}
/*
* UpdateDistNodeBoolAttr updates a boolean attribute of the specified worker
* to the given value.
*/
static void
UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum, bool value)
{
const bool indexOK = false;
@ -899,9 +1007,9 @@ MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
isnull[Anum_pg_dist_node_hasmetadata - 1] = false;
replace[Anum_pg_dist_node_hasmetadata - 1] = true;
values[attrNum - 1] = BoolGetDatum(value);
isnull[attrNum - 1] = false;
replace[attrNum - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
@ -1197,3 +1305,46 @@ DetachPartitionCommandList(void)
return detachPartitionCommandList;
}
/*
* SyncMetadataToNodes tries recreating the metadata snapshot in the
* metadata workers that are out of sync. Returns false if synchronization
* to at least one of the workers fails.
*/
bool
SyncMetadataToNodes(void)
{
List *workerList = NIL;
ListCell *workerCell = NULL;
bool result = true;
if (!IsCoordinator())
{
return true;
}
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerList = ActivePrimaryNodeList(NoLock);
foreach(workerCell, workerList)
{
WorkerNode *workerNode = lfirst(workerCell);
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
bool raiseInterrupts = false;
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
{
result = false;
}
else
{
MarkNodeMetadataSynced(workerNode->workerName,
workerNode->workerPort, true);
}
}
}
return result;
}

View File

@ -31,6 +31,7 @@
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
@ -605,6 +606,30 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.metadata_sync_interval",
gettext_noop("Sets the time to wait between metadata syncs."),
gettext_noop("metadata sync needs to run every so often "
"to synchronize metadata to metadata nodes "
"that are out of sync."),
&MetadataSyncInterval,
60000, 1, 7 * 24 * 3600 * 1000,
PGC_SIGHUP,
GUC_UNIT_MS | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.metadata_sync_retry_interval",
gettext_noop("Sets the interval to retry failed metadata syncs."),
gettext_noop("metadata sync needs to run every so often "
"to synchronize metadata to metadata nodes "
"that are out of sync."),
&MetadataSyncRetryInterval,
5000, 1, 7 * 24 * 3600 * 1000,
PGC_SIGHUP,
GUC_UNIT_MS | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.select_opens_transaction_block",
gettext_noop("Open transaction blocks for SELECT commands"),

View File

@ -143,4 +143,8 @@ REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM
REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC;
REVOKE ALL ON FUNCTION master_add_secondary_node(text,int,text,int,name) FROM PUBLIC;
ALTER TABLE pg_dist_node ADD COLUMN metadatasynced BOOLEAN DEFAULT FALSE;
COMMENT ON COLUMN pg_dist_node.metadatasynced IS
'indicates whether the node has the most recent metadata';
RESET search_path;

View File

@ -14,14 +14,20 @@
#include "fmgr.h"
#include "catalog/pg_type.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "postmaster/postmaster.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "utils/array.h"
#include "utils/builtins.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_metadata_snapshot);
PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
/*
@ -62,3 +68,34 @@ master_metadata_snapshot(PG_FUNCTION_ARGS)
PG_RETURN_ARRAYTYPE_P(snapshotCommandArrayType);
}
/*
* wait_until_metadata_sync waits until the maintenance daemon does a metadata
* sync, or times out.
*/
Datum
wait_until_metadata_sync(PG_FUNCTION_ARGS)
{
uint32 timeout = PG_GETARG_UINT32(0);
int waitResult = 0;
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
"localhost", PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
waitResult = WaitLatchOrSocket(NULL, WL_SOCKET_READABLE | WL_TIMEOUT,
PQsocket(connection->pgConn), timeout, 0);
if (waitResult & WL_SOCKET_MASK)
{
ClearResults(connection, true);
}
else if (waitResult & WL_TIMEOUT)
{
elog(WARNING, "waiting for metadata sync timed out");
}
CloseConnection(connection);
PG_RETURN_VOID();
}

View File

@ -359,8 +359,8 @@ StartRemoteTransactionAbort(MultiConnection *connection)
* [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always
* want to wait for that result, as that shouldn't take long and will
* reserve resources. But if there's another query running, we don't want
* to wait, because a longrunning statement may be running, force it to be
* killed in that case.
* to wait, because a long running statement may be running, so force it to
* be killed in that case.
*/
if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
transaction->transactionState == REMOTE_TRANS_PREPARED)
@ -619,7 +619,7 @@ RemoteTransactionsBeginIfNecessary(List *connectionList)
/*
* If a transaction already is in progress (including having failed),
* don't start it again. Thats quite normal if a piece of code allows
* don't start it again. That's quite normal if a piece of code allows
* cached connections.
*/
if (transaction->transactionState != REMOTE_TRANS_INVALID)
@ -708,7 +708,7 @@ HandleRemoteTransactionResultError(MultiConnection *connection, PGresult *result
* If the connection is marked as critical, and allowErrorPromotion is true,
* this routine will ERROR out. The allowErrorPromotion case is primarily
* required for the transaction management code itself. Usually it is helpful
* to fail as soon as possible. If !allowErrorPromotion transaction commit
* to fail as soon as possible. If !allowErrorPromotion transaction commit
* will instead issue an error before committing on any node.
*/
void

View File

@ -26,6 +26,7 @@
#include "catalog/pg_extension.h"
#include "citus_version.h"
#include "catalog/pg_namespace.h"
#include "commands/async.h"
#include "commands/extension.h"
#include "libpq/pqsignal.h"
#include "catalog/namespace.h"
@ -33,11 +34,13 @@
#include "distributed/maintenanced.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/statistics_collection.h"
#include "distributed/transaction_recovery.h"
#include "distributed/version_compat.h"
#include "nodes/makefuncs.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "nodes/makefuncs.h"
#include "storage/ipc.h"
#include "storage/proc.h"
@ -48,7 +51,6 @@
#include "utils/memutils.h"
#include "utils/lsyscache.h"
/*
* Shared memory data for all maintenance workers.
*/
@ -77,6 +79,7 @@ typedef struct MaintenanceDaemonDBData
Oid userOid;
bool daemonStarted;
pid_t workerPid;
bool triggerMetadataSync;
Latch *latch; /* pointer to the background worker's latch */
} MaintenanceDaemonDBData;
@ -84,6 +87,10 @@ typedef struct MaintenanceDaemonDBData
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000;
/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
int MetadataSyncRetryInterval = 5000;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
@ -100,6 +107,7 @@ static size_t MaintenanceDaemonShmemSize(void);
static void MaintenanceDaemonShmemInit(void);
static void MaintenanceDaemonErrorContext(void *arg);
static bool LockCitusExtension(void);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
/*
@ -185,6 +193,7 @@ InitializeMaintenanceDaemonBackend(void)
dbData->daemonStarted = true;
dbData->workerPid = 0;
dbData->triggerMetadataSync = false;
LWLockRelease(&MaintenanceDaemonControl->lock);
WaitForBackgroundWorkerStartup(handle, &pid);
@ -225,6 +234,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
ErrorContextCallback errorCallback;
TimestampTz lastRecoveryTime = 0;
TimestampTz nextMetadataSyncTime = 0;
/*
* Look up this worker's configuration.
@ -356,6 +366,36 @@ CitusMaintenanceDaemonMain(Datum main_arg)
}
#endif
if (MetadataSyncTriggeredCheckAndReset(myDbData) ||
GetCurrentTimestamp() >= nextMetadataSyncTime)
{
bool metadataSyncFailed = false;
int64 nextTimeout = 0;
InvalidateMetadataSystemCache();
StartTransactionCommand();
if (!LockCitusExtension())
{
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping metadata sync")));
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
metadataSyncFailed = !SyncMetadataToNodes();
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
}
CommitTransactionCommand();
ProcessCompletedNotifies();
nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval :
MetadataSyncInterval;
nextMetadataSyncTime =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
timeout = Min(timeout, nextTimeout);
}
/*
* If enabled, run 2PC recovery on primary nodes (where !RecoveryInProgress()),
* since we'll write to the pg_dist_transaction log.
@ -466,6 +506,14 @@ CitusMaintenanceDaemonMain(Datum main_arg)
/* check for changed configuration */
if (myDbData->userOid != GetSessionUserId())
{
/*
* Reset myDbData->daemonStarted so InitializeMaintenanceDaemonBackend()
* notices this is a restart.
*/
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
myDbData->daemonStarted = false;
LWLockRelease(&MaintenanceDaemonControl->lock);
/* return code of 1 requests worker restart */
proc_exit(1);
}
@ -657,3 +705,49 @@ StopMaintenanceDaemon(Oid databaseId)
kill(workerPid, SIGTERM);
}
}
/*
* TriggerMetadataSync triggers the maintenance daemon to do a metadata sync for
* the given database.
*/
void
TriggerMetadataSync(Oid databaseId)
{
bool found = false;
MaintenanceDaemonDBData *dbData = NULL;
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash,
&databaseId, HASH_FIND, &found);
if (found)
{
dbData->triggerMetadataSync = true;
/* set latch to wake-up the maintenance loop */
SetLatch(dbData->latch);
}
LWLockRelease(&MaintenanceDaemonControl->lock);
}
/*
* MetadataSyncTriggeredCheckAndReset checks if metadata sync has been
* triggered for the given database, and resets the flag.
*/
static bool
MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
{
bool metadataSyncTriggered = false;
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
metadataSyncTriggered = dbData->triggerMetadataSync;
dbData->triggerMetadataSync = false;
LWLockRelease(&MaintenanceDaemonControl->lock);
return metadataSyncTriggered;
}

View File

@ -581,7 +581,10 @@ LookupNodeByNodeId(uint32 nodeId)
WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex];
if (workerNode->nodeId == nodeId)
{
return workerNode;
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
return workerNodeCopy;
}
}
@ -2858,6 +2861,7 @@ InitializeWorkerNodeCache(void)
workerNode->nodeId = currentNode->nodeId;
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
workerNode->hasMetadata = currentNode->hasMetadata;
workerNode->metadataSynced = currentNode->metadataSynced;
workerNode->isActive = currentNode->isActive;
workerNode->nodeRole = currentNode->nodeRole;
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);

View File

@ -23,6 +23,7 @@
#include "distributed/citus_acquire_lock.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/maintenanced.h"
#include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
@ -31,6 +32,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_node.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
@ -58,6 +60,7 @@ typedef struct NodeMetadata
int32 groupId;
char *nodeRack;
bool hasMetadata;
bool metadataSynced;
bool isActive;
Oid nodeRole;
char *nodeCluster;
@ -78,6 +81,7 @@ static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAll(void);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_add_node);
@ -493,7 +497,7 @@ master_update_node(PG_FUNCTION_ARGS)
/*
* force is used when an update needs to happen regardless of conflicting locks. This
* feature is important to force the update during a failover due to failure, eg. by
* a high-availability system such as pg_auto_failover. The strategy is a to start a
* a high-availability system such as pg_auto_failover. The strategy is to start a
* background worker that actively cancels backends holding conflicting locks with
* this backend.
*
@ -578,6 +582,26 @@ master_update_node(PG_FUNCTION_ARGS)
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH);
workerNode->workerPort = newNodePort;
/*
* Propagate the updated pg_dist_node entry to all metadata workers.
* citus-ha uses master_update_node() in a prepared transaction, and
* we don't support coordinated prepared transactions, so we cannot
* propagate the changes to the worker nodes here. Instead we mark
* all metadata nodes as not-synced and ask maintenanced to do the
* propagation.
*
* It is possible that maintenance daemon does the first resync too
* early, but that's fine, since this will start a retry loop with
* 5 second intervals until sync is complete.
*/
if (UnsetMetadataSyncedForAll())
{
TriggerMetadataSync(MyDatabaseId);
}
if (handle != NULL)
{
/*
@ -1255,6 +1279,8 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack);
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata);
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(
nodeMetadata->metadataSynced);
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
@ -1463,6 +1489,7 @@ ParseWorkerNodeFileAndRename()
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNode->hasMetadata = false;
workerNode->metadataSynced = false;
workerNode->isActive = true;
workerNodeList = lappend(workerNodeList, workerNode);
@ -1519,6 +1546,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);
workerNode->metadataSynced =
DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
@ -1574,3 +1603,69 @@ DatumToString(Datum datum, Oid dataType)
return outputString;
}
/*
* UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata
* nodes to false. It returns true if it updated at least a node.
*/
static bool
UnsetMetadataSyncedForAll(void)
{
bool updatedAtLeastOne = false;
Relation relation = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[2];
int scanKeyCount = 2;
bool indexOK = false;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
relation = heap_open(DistNodeRelationId(), ExclusiveLock);
tupleDescriptor = RelationGetDescr(relation);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
scanDescriptor = systable_beginscan(relation,
InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
updatedAtLeastOne = true;
}
while (HeapTupleIsValid(heapTuple))
{
HeapTuple newHeapTuple = NULL;
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
memset(replace, false, sizeof(replace));
memset(isnull, false, sizeof(isnull));
memset(values, 0, sizeof(values));
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
newHeapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(relation, &newHeapTuple->t_self, newHeapTuple);
CommandCounterIncrement();
heap_freetuple(newHeapTuple);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(relation, NoLock);
return updatedAtLeastOne;
}

View File

@ -177,8 +177,8 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort,
tableOwner, commandList);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
commandList);
}
}

View File

@ -182,6 +182,8 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user, const
char *database);
extern List * StartWorkerListConnections(List *workerList, uint32 flags, const char *user,
const char *database);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname,
int32 port,

View File

@ -22,6 +22,7 @@
extern double DistributedDeadlockDetectionTimeoutFactor;
extern void StopMaintenanceDaemon(Oid databaseId);
extern void TriggerMetadataSync(Oid databaseId);
extern void InitializeMaintenanceDaemon(void);
extern void InitializeMaintenanceDaemonBackend(void);

View File

@ -16,6 +16,9 @@
#include "distributed/metadata_cache.h"
#include "nodes/pg_list.h"
/* config variables */
extern int MetadataSyncInterval;
extern int MetadataSyncRetryInterval;
/* Functions declarations for metadata syncing */
extern bool ClusterHasKnownMetadataWorkers(void);
@ -37,7 +40,11 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, int32 groupId);
extern void CreateTableMetadataOnWorkers(Oid relationId);
extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced);
extern bool SyncMetadataToNodes(void);
extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
char *nodeUser,
List *commandList);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
@ -57,5 +64,6 @@ extern void CreateTableMetadataOnWorkers(Oid relationId);
"shardstate = EXCLUDED.shardstate, " \
"shardlength = EXCLUDED.shardlength, " \
"groupid = EXCLUDED.groupid"
#define METADATA_SYNC_CHANNEL "metadata_sync"
#endif /* METADATA_SYNC_H */

View File

@ -20,7 +20,7 @@
* in particular their OUT parameters) must be changed whenever the definition of
* pg_dist_node changes.
*/
#define Natts_pg_dist_node 9
#define Natts_pg_dist_node 10
#define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3
@ -30,6 +30,7 @@
#define Anum_pg_dist_node_isactive 7
#define Anum_pg_dist_node_noderole 8
#define Anum_pg_dist_node_nodecluster 9
#define Anum_pg_dist_node_metadatasynced 10
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"

View File

@ -47,6 +47,7 @@ typedef struct WorkerNode
bool isActive; /* node's state */
Oid nodeRole; /* the node's role in its group */
char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */
bool metadataSynced; /* node has the most recent metadata */
} WorkerNode;

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
264 263 f
266 265 f
transactionnumberwaitingtransactionnumbers
263
264 263
265
266 265
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
268 267 f
269 267 f
269 268 t
270 269 f
271 269 f
271 270 t
transactionnumberwaitingtransactionnumbers
267
268 267
269 267,268
269
270 269
271 269,270
step s1-abort:
ABORT;

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
265 264 f
267 266 f
transactionnumberwaitingtransactionnumbers
264
265 264
266
267 266
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
269 268 f
270 268 f
270 269 t
271 270 f
272 270 f
272 271 t
transactionnumberwaitingtransactionnumbers
268
269 268
270 268,269
270
271 270
272 270,271
step s1-abort:
ABORT;

View File

@ -22,13 +22,14 @@ step s2-update-node-2:
(select nodeid from pg_dist_node where nodeport = 57638),
'localhost',
58638);
?column?
1
<waiting ...>
step s1-commit:
COMMIT;
step s2-update-node-2: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodeid, nodename, nodeport, isactive
FROM pg_dist_node
@ -71,7 +72,9 @@ step s1-commit:
COMMIT;
step s2-update-node-1: <... completed>
error in steps s1-commit s2-update-node-1: ERROR: tuple concurrently updated
?column?
1
step s2-abort:
ABORT;
@ -86,3 +89,47 @@ nodeid nodename nodeport isactive
24 localhost 58637 t
nodeid nodename nodeport
starting permutation: s1-begin s1-update-node-1 s2-start-metadata-sync-node-2 s1-commit s2-verify-metadata
nodeid nodename nodeport
26 localhost 57637
27 localhost 57638
step s1-begin:
BEGIN;
step s1-update-node-1:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
58637);
?column?
1
step s2-start-metadata-sync-node-2:
SELECT start_metadata_sync_to_node('localhost', 57638);
<waiting ...>
step s1-commit:
COMMIT;
step s2-start-metadata-sync-node-2: <... completed>
start_metadata_sync_to_node
step s2-verify-metadata:
SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid;
SELECT master_run_on_worker(
ARRAY['localhost'], ARRAY[57638],
ARRAY['SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport) ORDER BY nodeid) FROM pg_dist_node'],
false);
nodeid groupid nodename nodeport
26 26 localhost 58637
27 27 localhost 57638
master_run_on_worker
(localhost,57638,t,"[{""f1"": 26, ""f2"": 26, ""f3"": ""localhost"", ""f4"": 58637}, {""f1"": 27, ""f2"": 27, ""f3"": ""localhost"", ""f4"": 57638}]")
nodeid nodename nodeport

View File

@ -301,14 +301,19 @@ SELECT master_remove_node('localhost', 9990);
-- clean-up
DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
-- check that adding/removing nodes are propagated to nodes with metadata
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
@ -336,8 +341,13 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
(0 rows)
\c - - - :master_port
-- check that added nodes are not propagated to nodes with hasmetadata=false
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
-- check that added nodes are not propagated to nodes without metadata
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
@ -376,10 +386,10 @@ SELECT
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
11 | 9 | localhost | 57637 | default | f | t | primary | default
12 | 10 | localhost | 57638 | default | f | t | primary | default
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------
11 | 9 | localhost | 57637 | default | f | t | primary | default | f
12 | 10 | localhost | 57638 | default | f | t | primary | default | f
(2 rows)
-- check that mixed add/remove node commands work fine inside transaction
@ -408,7 +418,12 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
----------+----------
(0 rows)
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
@ -593,11 +608,11 @@ CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 a
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster)
VALUES ('localhost', 5000, 1000, 'primary', 'olap');
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
DETAIL: Failing row contains (19, 1000, localhost, 5000, default, f, t, primary, olap).
DETAIL: Failing row contains (19, 1000, localhost, 5000, default, f, t, primary, olap, f).
UPDATE pg_dist_node SET nodecluster = 'olap'
WHERE nodeport = :worker_1_port;
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap).
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f).
-- check that you /can/ add a secondary node to a non-default cluster
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap');
@ -620,9 +635,9 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole =
(1 row)
SELECT * FROM pg_dist_node WHERE nodeport=8887;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------
24 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------+----------------
24 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f
(1 row)
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
@ -663,9 +678,9 @@ SELECT master_update_node(:worker_1_node, 'somehost', 9000);
(1 row)
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
16 | 14 | somehost | 9000 | default | f | t | primary | default
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
--------+---------+----------+----------+----------+-------------+----------+----------+-------------+----------------
16 | 14 | somehost | 9000 | default | f | t | primary | default | f
(1 row)
-- cleanup
@ -676,8 +691,8 @@ SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
(1 row)
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
16 | 14 | localhost | 57637 | default | f | t | primary | default
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------
16 | 14 | localhost | 57637 | default | f | t | primary | default | f
(1 row)

View File

@ -5,11 +5,12 @@ SELECT attrelid::regclass, attname, atthasmissing, attmissingval
FROM pg_attribute
WHERE atthasmissing
ORDER BY attrelid, attname;
attrelid | attname | atthasmissing | attmissingval
--------------+-------------+---------------+---------------
pg_dist_node | hasmetadata | t | {f}
pg_dist_node | isactive | t | {t}
pg_dist_node | nodecluster | t | {default}
pg_dist_node | noderole | t | {primary}
(4 rows)
attrelid | attname | atthasmissing | attmissingval
--------------+----------------+---------------+---------------
pg_dist_node | hasmetadata | t | {f}
pg_dist_node | isactive | t | {t}
pg_dist_node | metadatasynced | t | {f}
pg_dist_node | nodecluster | t | {default}
pg_dist_node | noderole | t | {primary}
(5 rows)

View File

@ -27,9 +27,9 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
unnest
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(3 rows)
@ -60,7 +60,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
ALTER TABLE public.mx_test_table OWNER TO postgres
ALTER TABLE public.mx_test_table OWNER TO postgres
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
@ -81,7 +81,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
ALTER TABLE public.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) TABLESPACE pg_default
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
@ -105,7 +105,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
@ -133,7 +133,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
@ -154,7 +154,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
@ -233,12 +233,12 @@ SELECT * FROM pg_dist_local_group;
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------
1 | 1 | localhost | 57637 | default | t | t | primary | default
2 | 2 | localhost | 57638 | default | f | t | primary | default
4 | 1 | localhost | 8888 | default | f | t | secondary | default
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | f
2 | 2 | localhost | 57638 | default | f | t | primary | default | f
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f
(4 rows)
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
@ -372,12 +372,12 @@ SELECT * FROM pg_dist_local_group;
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------
1 | 1 | localhost | 57637 | default | t | t | primary | default
2 | 2 | localhost | 57638 | default | f | t | primary | default
4 | 1 | localhost | 8888 | default | f | t | secondary | default
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t
2 | 2 | localhost | 57638 | default | f | t | primary | default | f
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f
(4 rows)
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;

View File

@ -0,0 +1,422 @@
-- Test creation of mx tables and metadata syncing
SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
\gset
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
SELECT nextval('pg_catalog.pg_dist_shardid_seq') AS last_shard_id \gset
SET citus.replication_model TO streaming;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
CREATE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
CREATE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
RETURNS BOOLEAN
LANGUAGE sql
AS $$
WITH dist_node_summary AS (
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
), dist_node_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_node_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_node_summary.query, dist_node_summary.query],
false)
), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
false)
)
SELECT dist_node_check.matches AND dist_placement_check.matches
FROM dist_node_check CROSS JOIN dist_placement_check
$$;
-- Simulates a readonly node by setting default_transaction_read_only.
CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
RETURNS TEXT
LANGUAGE sql
AS $$
SELECT master_run_on_worker(ARRAY[hostname], ARRAY[port],
ARRAY['ALTER SYSTEM SET default_transaction_read_only TO ' || isreadonly::TEXT], false);
SELECT result FROM
master_run_on_worker(ARRAY[hostname], ARRAY[port],
ARRAY['SELECT pg_reload_conf()'], false);
$$;
-- add a node to the cluster
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 57637 | f | f
(1 row)
-- create couple of tables
CREATE TABLE ref_table(a int primary key);
SELECT create_reference_table('ref_table');
create_reference_table
------------------------
(1 row)
CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a));
SELECT create_distributed_table('dist_table_1', 'a');
create_distributed_table
--------------------------
(1 row)
-- update the node
SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
'localhost', :worker_2_port);
?column?
----------
1
(1 row)
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 57638 | f | f
(1 row)
-- start syncing metadata to the node
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 57638 | t | t
(1 row)
--------------------------------------------------------------------------
-- Test that maintenance daemon syncs after master_update_node
--------------------------------------------------------------------------
-- Update the node again. We do this as epeatable read, so we just see the
-- changes by master_update_node(). This is to avoid inconsistent results
-- if the maintenance daemon does the metadata sync too fast.
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 57638 | t | t
(1 row)
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column?
----------
1
(1 row)
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 57637 | t | f
(1 row)
END;
-- wait until maintenance daemon does the next metadata sync, and then
-- check if metadata is synced again
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | t
(1 row)
SELECT verify_metadata('localhost', :worker_1_port);
verify_metadata
-----------------
t
(1 row)
-- Update the node to a non-existent node. This is to simulate updating to
-- a unwriteable node.
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 57637 | t | t
(1 row)
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
?column?
----------
1
(1 row)
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | nodename | nodeport | hasmetadata | metadatasynced
--------+-----------+----------+-------------+----------------
2 | localhost | 12345 | t | f
(1 row)
END;
-- maintenace daemon metadata sync should fail, because node is still unwriteable.
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | f
(1 row)
-- update it back to :worker_1_port, now metadata should be synced
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column?
----------
1
(1 row)
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | t
(1 row)
--------------------------------------------------------------------------
-- Test updating a node when another node is in readonly-mode
--------------------------------------------------------------------------
SELECT FROM master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
NOTICE: Replicating reference table "ref_table" to the node localhost:57638
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- Create a table with shards on both nodes
CREATE TABLE dist_table_2(a int);
SELECT create_distributed_table('dist_table_2', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i;
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
mark_node_readonly
--------------------
t
(1 row)
-- Now updating the other node will mark worker 2 as not synced.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
?column?
----------
1
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | f
3 | t | f
(2 rows)
COMMIT;
-- worker_2 is out of sync, so further updates aren't sent to it and
-- we shouldn't see the warnings.
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456);
?column?
----------
1
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | f
3 | t | f
(2 rows)
-- Make the node writeable.
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
mark_node_readonly
--------------------
t
(1 row)
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
-- Mark the node readonly again, so the following master_update_node warns
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
mark_node_readonly
--------------------
t
(1 row)
-- Revert the nodeport of worker 1.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column?
----------
1
(1 row)
SELECT count(*) FROM dist_table_2;
count
-------
100
(1 row)
END;
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
-- Make the node writeable.
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
mark_node_readonly
--------------------
t
(1 row)
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column?
----------
1
(1 row)
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
verify_metadata | verify_metadata
-----------------+-----------------
t | t
(1 row)
--------------------------------------------------------------------------
-- Test that master_update_node rolls back properly
--------------------------------------------------------------------------
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
?column?
----------
1
(1 row)
ROLLBACK;
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
verify_metadata | verify_metadata
-----------------+-----------------
t | t
(1 row)
--------------------------------------------------------------------------
-- Test that master_update_node can appear in a prepared transaction.
--------------------------------------------------------------------------
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
?column?
----------
1
(1 row)
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | f
3 | t | t
(2 rows)
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column?
----------
1
(1 row)
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | t
3 | t | t
(2 rows)
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
verify_metadata | verify_metadata
-----------------+-----------------
t | t
(1 row)
-- cleanup
DROP TABLE dist_table_1, ref_table, dist_table_2;
TRUNCATE pg_dist_colocation;
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
count
-------
2
(1 row)
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART :last_shard_id;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.replication_model;

View File

@ -23,11 +23,11 @@ INSERT INTO dest_table (a, b) VALUES (1, 1);
INSERT INTO dest_table (a, b) VALUES (2, 1);
INSERT INTO source_table (a, b) VALUES (10, 10);
-- simluate actually having secondary nodes
SELECT * FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
1 | 1 | localhost | 57637 | default | f | t | primary | default
2 | 2 | localhost | 57638 | default | f | t | primary | default
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+----------+----------+-------------
1 | 1 | localhost | 57637 | default | t | primary | default
2 | 2 | localhost | 57638 | default | t | primary | default
(2 rows)
UPDATE pg_dist_node SET noderole = 'secondary';

View File

@ -14,6 +14,7 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_mx_master_update_node
test: multi_cluster_management
test: multi_test_helpers

View File

@ -70,6 +70,20 @@ step "s2-update-node-2"
58638);
}
step "s2-verify-metadata"
{
SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid;
SELECT master_run_on_worker(
ARRAY['localhost'], ARRAY[57638],
ARRAY['SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport) ORDER BY nodeid) FROM pg_dist_node'],
false);
}
step "s2-start-metadata-sync-node-2"
{
SELECT start_metadata_sync_to_node('localhost', 57638);
}
step "s2-abort"
{
ABORT;
@ -85,3 +99,8 @@ permutation "s1-begin" "s1-update-node-1" "s2-update-node-2" "s1-commit" "s1-sho
# sessions 1 updates node 1, session 2 tries to do the same
permutation "s1-begin" "s1-update-node-1" "s2-begin" "s2-update-node-1" "s1-commit" "s2-abort" "s1-show-nodes"
# master_update_node should block start_metadata_sync_to_node. Note that we
# cannot run start_metadata_sync_to_node in a transaction, so we're not
# testing the reverse order here.
permutation "s1-begin" "s1-update-node-1" "s2-start-metadata-sync-node-2" "s1-commit" "s2-verify-metadata"

View File

@ -134,9 +134,9 @@ SELECT master_remove_node('localhost', 9990);
-- clean-up
DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
-- check that adding/removing nodes are propagated to nodes with metadata
SELECT master_remove_node('localhost', :worker_2_port);
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
@ -146,8 +146,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
-- check that added nodes are not propagated to nodes with hasmetadata=false
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
-- check that added nodes are not propagated to nodes without metadata
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
@ -174,7 +174,7 @@ COMMIT;
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', :worker_2_port);

View File

@ -0,0 +1,212 @@
-- Test creation of mx tables and metadata syncing
SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
\gset
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
SELECT nextval('pg_catalog.pg_dist_shardid_seq') AS last_shard_id \gset
SET citus.replication_model TO streaming;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
CREATE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
CREATE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
RETURNS BOOLEAN
LANGUAGE sql
AS $$
WITH dist_node_summary AS (
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
), dist_node_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_node_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_node_summary.query, dist_node_summary.query],
false)
), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
false)
)
SELECT dist_node_check.matches AND dist_placement_check.matches
FROM dist_node_check CROSS JOIN dist_placement_check
$$;
-- Simulates a readonly node by setting default_transaction_read_only.
CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
RETURNS TEXT
LANGUAGE sql
AS $$
SELECT master_run_on_worker(ARRAY[hostname], ARRAY[port],
ARRAY['ALTER SYSTEM SET default_transaction_read_only TO ' || isreadonly::TEXT], false);
SELECT result FROM
master_run_on_worker(ARRAY[hostname], ARRAY[port],
ARRAY['SELECT pg_reload_conf()'], false);
$$;
-- add a node to the cluster
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
-- create couple of tables
CREATE TABLE ref_table(a int primary key);
SELECT create_reference_table('ref_table');
CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a));
SELECT create_distributed_table('dist_table_1', 'a');
-- update the node
SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
'localhost', :worker_2_port);
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
-- start syncing metadata to the node
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
--------------------------------------------------------------------------
-- Test that maintenance daemon syncs after master_update_node
--------------------------------------------------------------------------
-- Update the node again. We do this as epeatable read, so we just see the
-- changes by master_update_node(). This is to avoid inconsistent results
-- if the maintenance daemon does the metadata sync too fast.
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
END;
-- wait until maintenance daemon does the next metadata sync, and then
-- check if metadata is synced again
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
SELECT verify_metadata('localhost', :worker_1_port);
-- Update the node to a non-existent node. This is to simulate updating to
-- a unwriteable node.
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
END;
-- maintenace daemon metadata sync should fail, because node is still unwriteable.
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
-- update it back to :worker_1_port, now metadata should be synced
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
--------------------------------------------------------------------------
-- Test updating a node when another node is in readonly-mode
--------------------------------------------------------------------------
SELECT FROM master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
-- Create a table with shards on both nodes
CREATE TABLE dist_table_2(a int);
SELECT create_distributed_table('dist_table_2', 'a');
INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i;
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
-- Now updating the other node will mark worker 2 as not synced.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
COMMIT;
-- worker_2 is out of sync, so further updates aren't sent to it and
-- we shouldn't see the warnings.
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456);
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
-- Make the node writeable.
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
SELECT wait_until_metadata_sync();
-- Mark the node readonly again, so the following master_update_node warns
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
-- Revert the nodeport of worker 1.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
SELECT count(*) FROM dist_table_2;
END;
SELECT wait_until_metadata_sync();
-- Make the node writeable.
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
SELECT wait_until_metadata_sync();
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
--------------------------------------------------------------------------
-- Test that master_update_node rolls back properly
--------------------------------------------------------------------------
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
ROLLBACK;
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
--------------------------------------------------------------------------
-- Test that master_update_node can appear in a prepared transaction.
--------------------------------------------------------------------------
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
-- cleanup
DROP TABLE dist_table_1, ref_table, dist_table_2;
TRUNCATE pg_dist_colocation;
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART :last_shard_id;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.replication_model;

View File

@ -19,7 +19,7 @@ INSERT INTO dest_table (a, b) VALUES (2, 1);
INSERT INTO source_table (a, b) VALUES (10, 10);
-- simluate actually having secondary nodes
SELECT * FROM pg_dist_node;
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
UPDATE pg_dist_node SET noderole = 'secondary';
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"