mirror of https://github.com/citusdata/citus.git
Add metadatasynced, and sync on master_update_node()
Co-authored-by: pykello <hadi.moshayedi@microsoft.com> Co-authored-by: serprex <serprex@users.noreply.github.com>pull/2928/head
parent
db5d03931d
commit
76f3933b05
|
@ -249,6 +249,35 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartWorkerListConnections starts connections to the given worker list and
|
||||
* returns them as a MultiConnection list.
|
||||
*/
|
||||
List *
|
||||
StartWorkerListConnections(List *workerNodeList, uint32 flags, const char *user,
|
||||
const char *database)
|
||||
{
|
||||
List *connectionList = NIL;
|
||||
ListCell *workerNodeCell = NULL;
|
||||
|
||||
foreach(workerNodeCell, workerNodeList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
char *nodeName = workerNode->workerName;
|
||||
int nodePort = workerNode->workerPort;
|
||||
MultiConnection *connection = NULL;
|
||||
int connectionFlags = 0;
|
||||
|
||||
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
|
||||
user, database);
|
||||
|
||||
connectionList = lappend(connectionList, connection);
|
||||
}
|
||||
|
||||
return connectionList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
|
||||
*
|
||||
|
|
|
@ -443,9 +443,9 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
|
|||
|
||||
/*
|
||||
* store result if result has been set, when the user is not interested in the result
|
||||
* a NULL pointer could be passed and the result will be cleared
|
||||
* a NULL pointer could be passed and the result will be cleared.
|
||||
*/
|
||||
if (result)
|
||||
if (result != NULL)
|
||||
{
|
||||
*result = localResult;
|
||||
}
|
||||
|
@ -454,6 +454,7 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
|
|||
PQclear(localResult);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
return RESPONSE_OKAY;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,11 +38,13 @@
|
|||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
@ -52,6 +54,8 @@
|
|||
|
||||
static char * LocalGroupIdUpdateCommand(int32 groupId);
|
||||
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
|
||||
static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum,
|
||||
bool value);
|
||||
static List * SequenceDDLCommandsForTable(Oid relationId);
|
||||
static void EnsureSupportedSequenceColumnType(Oid sequenceOid);
|
||||
static Oid TypeOfColumn(Oid tableId, int16 columnId);
|
||||
|
@ -59,18 +63,15 @@ static char * TruncateTriggerCreateCommand(Oid relationId);
|
|||
static char * SchemaOwnerName(Oid objectId);
|
||||
static bool HasMetadataWorkers(void);
|
||||
static List * DetachPartitionCommandList(void);
|
||||
static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
|
||||
|
||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
||||
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
|
||||
|
||||
|
||||
/*
|
||||
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the
|
||||
* worker for accepting queries. The function first sets the localGroupId of the worker
|
||||
* so that the worker knows which tuple in pg_dist_node table represents itself. After
|
||||
* that, SQL statements for re-creating metadata of MX-eligible distributed tables are
|
||||
* sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node
|
||||
* is marked as true.
|
||||
* start_metadata_sync_to_node function sets hasmetadata column of the given
|
||||
* node to true, and then synchronizes the metadata on the node.
|
||||
*/
|
||||
Datum
|
||||
start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
||||
|
@ -78,14 +79,12 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||
int32 nodePort = PG_GETARG_INT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
char *extensionOwner = CitusExtensionOwnerName();
|
||||
char *escapedNodeName = quote_literal_cstr(nodeNameString);
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
char *localGroupIdUpdateCommand = NULL;
|
||||
List *recreateMetadataSnapshotCommandList = NIL;
|
||||
List *dropMetadataCommandList = NIL;
|
||||
List *createMetadataCommandList = NIL;
|
||||
|
||||
/* fail if metadata synchronization doesn't succeed */
|
||||
bool raiseInterrupts = true;
|
||||
|
||||
EnsureCoordinator();
|
||||
EnsureSuperUser();
|
||||
|
@ -94,6 +93,8 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
|
||||
PreventInTransactionBlock(true, "start_metadata_sync_to_node");
|
||||
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
|
||||
workerNode = FindWorkerNode(nodeNameString, nodePort);
|
||||
if (workerNode == NULL)
|
||||
{
|
||||
|
@ -123,31 +124,8 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/* generate and add the local group id's update query */
|
||||
localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
|
||||
|
||||
/* generate the queries which drop the metadata */
|
||||
dropMetadataCommandList = MetadataDropCommands();
|
||||
|
||||
/* generate the queries which create the metadata from scratch */
|
||||
createMetadataCommandList = MetadataCreateCommands();
|
||||
|
||||
recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList,
|
||||
localGroupIdUpdateCommand);
|
||||
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
|
||||
dropMetadataCommandList);
|
||||
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
|
||||
createMetadataCommandList);
|
||||
|
||||
/*
|
||||
* Send the snapshot recreation commands in a single remote transaction and
|
||||
* error out in any kind of failure. Note that it is not required to send
|
||||
* createMetadataSnapshotCommandList in the same transaction that we send
|
||||
* nodeDeleteCommand and nodeInsertCommand commands below.
|
||||
*/
|
||||
EnsureNoModificationsHaveBeenDone();
|
||||
SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner,
|
||||
recreateMetadataSnapshotCommandList);
|
||||
SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
|
||||
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -170,6 +148,8 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
EnsureSuperUser();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
|
||||
workerNode = FindWorkerNode(nodeNameString, nodePort);
|
||||
if (workerNode == NULL)
|
||||
{
|
||||
|
@ -178,6 +158,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
MarkNodeHasMetadata(nodeNameString, nodePort, false);
|
||||
MarkNodeMetadataSynced(nodeNameString, nodePort, false);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -237,6 +218,105 @@ ShouldSyncTableMetadata(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncMetadataSnapshotToNode does the following:
|
||||
* 1. Sets the localGroupId on the worker so the worker knows which tuple in
|
||||
* pg_dist_node represents itself.
|
||||
* 2. Recreates the distributed metadata on the given worker.
|
||||
* If raiseOnError is true, it errors out if synchronization fails.
|
||||
*/
|
||||
static bool
|
||||
SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
||||
{
|
||||
char *extensionOwner = CitusExtensionOwnerName();
|
||||
|
||||
/* generate and add the local group id's update query */
|
||||
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
|
||||
|
||||
/* generate the queries which drop the metadata */
|
||||
List *dropMetadataCommandList = MetadataDropCommands();
|
||||
|
||||
/* generate the queries which create the metadata from scratch */
|
||||
List *createMetadataCommandList = MetadataCreateCommands();
|
||||
|
||||
List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
|
||||
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
|
||||
dropMetadataCommandList);
|
||||
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
|
||||
createMetadataCommandList);
|
||||
|
||||
/*
|
||||
* Send the snapshot recreation commands in a single remote transaction and
|
||||
* if requested, error out in any kind of failure. Note that it is not
|
||||
* required to send createMetadataSnapshotCommandList in the same transaction
|
||||
* that we send nodeDeleteCommand and nodeInsertCommand commands below.
|
||||
*/
|
||||
if (raiseOnError)
|
||||
{
|
||||
SendCommandListToWorkerInSingleTransaction(workerNode->workerName,
|
||||
workerNode->workerPort,
|
||||
extensionOwner,
|
||||
recreateMetadataSnapshotCommandList);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool success =
|
||||
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName,
|
||||
workerNode->workerPort,
|
||||
extensionOwner,
|
||||
recreateMetadataSnapshotCommandList);
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendOptionalCommandListToWorkerInTransaction sends the given command list to
|
||||
* the given worker in a single transaction. If any of the commands fail, it
|
||||
* rollbacks the transaction, and otherwise commits.
|
||||
*/
|
||||
bool
|
||||
SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
|
||||
char *nodeUser, List *commandList)
|
||||
{
|
||||
MultiConnection *workerConnection = NULL;
|
||||
ListCell *commandCell = NULL;
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
bool failed = false;
|
||||
|
||||
workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
|
||||
nodeUser, NULL);
|
||||
|
||||
RemoteTransactionBegin(workerConnection);
|
||||
|
||||
/* iterate over the commands and execute them in the same connection */
|
||||
foreach(commandCell, commandList)
|
||||
{
|
||||
char *commandString = lfirst(commandCell);
|
||||
|
||||
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
|
||||
{
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failed)
|
||||
{
|
||||
RemoteTransactionAbort(workerConnection);
|
||||
}
|
||||
else
|
||||
{
|
||||
RemoteTransactionCommit(workerConnection);
|
||||
}
|
||||
|
||||
CloseConnection(workerConnection);
|
||||
|
||||
return !failed;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MetadataCreateCommands returns list of queries that are
|
||||
* required to create the current metadata snapshot of the node that the
|
||||
|
@ -500,13 +580,14 @@ NodeListInsertCommand(List *workerNodeList)
|
|||
/* generate the query without any values yet */
|
||||
appendStringInfo(nodeListInsertCommand,
|
||||
"INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
|
||||
"noderack, hasmetadata, isactive, noderole, nodecluster) VALUES ");
|
||||
"noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES ");
|
||||
|
||||
/* iterate over the worker nodes, add the values */
|
||||
foreach(workerNodeCell, workerNodeList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
|
||||
char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
|
||||
char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
|
||||
|
||||
Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
|
||||
|
@ -514,13 +595,14 @@ NodeListInsertCommand(List *workerNodeList)
|
|||
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
|
||||
|
||||
appendStringInfo(nodeListInsertCommand,
|
||||
"(%d, %d, %s, %d, %s, %s, %s, '%s'::noderole, %s)",
|
||||
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s)",
|
||||
workerNode->nodeId,
|
||||
workerNode->groupId,
|
||||
quote_literal_cstr(workerNode->workerName),
|
||||
workerNode->workerPort,
|
||||
quote_literal_cstr(workerNode->workerRack),
|
||||
hasMetadataString,
|
||||
metadataSyncedString,
|
||||
isActiveString,
|
||||
nodeRoleString,
|
||||
quote_literal_cstr(workerNode->nodeCluster));
|
||||
|
@ -863,10 +945,36 @@ LocalGroupIdUpdateCommand(int32 groupId)
|
|||
|
||||
/*
|
||||
* MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in
|
||||
* pg_dist_node to true.
|
||||
* pg_dist_node to hasMetadata.
|
||||
*/
|
||||
static void
|
||||
MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
|
||||
{
|
||||
UpdateDistNodeBoolAttr(nodeName, nodePort,
|
||||
Anum_pg_dist_node_hasmetadata,
|
||||
hasMetadata);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkNodeMetadataSynced function sets the metadatasynced column of the
|
||||
* specified worker in pg_dist_node to the given value.
|
||||
*/
|
||||
void
|
||||
MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced)
|
||||
{
|
||||
UpdateDistNodeBoolAttr(nodeName, nodePort,
|
||||
Anum_pg_dist_node_metadatasynced,
|
||||
synced);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateDistNodeBoolAttr updates a boolean attribute of the specified worker
|
||||
* to the given value.
|
||||
*/
|
||||
static void
|
||||
UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum, bool value)
|
||||
{
|
||||
const bool indexOK = false;
|
||||
|
||||
|
@ -899,9 +1007,9 @@ MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
|
|||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
|
||||
isnull[Anum_pg_dist_node_hasmetadata - 1] = false;
|
||||
replace[Anum_pg_dist_node_hasmetadata - 1] = true;
|
||||
values[attrNum - 1] = BoolGetDatum(value);
|
||||
isnull[attrNum - 1] = false;
|
||||
replace[attrNum - 1] = true;
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||
|
||||
|
@ -1197,3 +1305,46 @@ DetachPartitionCommandList(void)
|
|||
|
||||
return detachPartitionCommandList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncMetadataToNodes tries recreating the metadata snapshot in the
|
||||
* metadata workers that are out of sync. Returns false if synchronization
|
||||
* to at least one of the workers fails.
|
||||
*/
|
||||
bool
|
||||
SyncMetadataToNodes(void)
|
||||
{
|
||||
List *workerList = NIL;
|
||||
ListCell *workerCell = NULL;
|
||||
bool result = true;
|
||||
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
|
||||
workerList = ActivePrimaryNodeList(NoLock);
|
||||
foreach(workerCell, workerList)
|
||||
{
|
||||
WorkerNode *workerNode = lfirst(workerCell);
|
||||
|
||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
bool raiseInterrupts = false;
|
||||
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||
{
|
||||
result = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
MarkNodeMetadataSynced(workerNode->workerName,
|
||||
workerNode->workerPort, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
|
@ -605,6 +606,30 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.metadata_sync_interval",
|
||||
gettext_noop("Sets the time to wait between metadata syncs."),
|
||||
gettext_noop("metadata sync needs to run every so often "
|
||||
"to synchronize metadata to metadata nodes "
|
||||
"that are out of sync."),
|
||||
&MetadataSyncInterval,
|
||||
60000, 1, 7 * 24 * 3600 * 1000,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MS | GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.metadata_sync_retry_interval",
|
||||
gettext_noop("Sets the interval to retry failed metadata syncs."),
|
||||
gettext_noop("metadata sync needs to run every so often "
|
||||
"to synchronize metadata to metadata nodes "
|
||||
"that are out of sync."),
|
||||
&MetadataSyncRetryInterval,
|
||||
5000, 1, 7 * 24 * 3600 * 1000,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MS | GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.select_opens_transaction_block",
|
||||
gettext_noop("Open transaction blocks for SELECT commands"),
|
||||
|
|
|
@ -143,4 +143,8 @@ REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM
|
|||
REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC;
|
||||
REVOKE ALL ON FUNCTION master_add_secondary_node(text,int,text,int,name) FROM PUBLIC;
|
||||
|
||||
ALTER TABLE pg_dist_node ADD COLUMN metadatasynced BOOLEAN DEFAULT FALSE;
|
||||
COMMENT ON COLUMN pg_dist_node.metadatasynced IS
|
||||
'indicates whether the node has the most recent metadata';
|
||||
|
||||
RESET search_path;
|
||||
|
|
|
@ -14,14 +14,20 @@
|
|||
#include "fmgr.h"
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/array.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(master_metadata_snapshot);
|
||||
PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -62,3 +68,34 @@ master_metadata_snapshot(PG_FUNCTION_ARGS)
|
|||
|
||||
PG_RETURN_ARRAYTYPE_P(snapshotCommandArrayType);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* wait_until_metadata_sync waits until the maintenance daemon does a metadata
|
||||
* sync, or times out.
|
||||
*/
|
||||
Datum
|
||||
wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint32 timeout = PG_GETARG_UINT32(0);
|
||||
int waitResult = 0;
|
||||
|
||||
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
|
||||
"localhost", PostPortNumber);
|
||||
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
|
||||
|
||||
waitResult = WaitLatchOrSocket(NULL, WL_SOCKET_READABLE | WL_TIMEOUT,
|
||||
PQsocket(connection->pgConn), timeout, 0);
|
||||
if (waitResult & WL_SOCKET_MASK)
|
||||
{
|
||||
ClearResults(connection, true);
|
||||
}
|
||||
else if (waitResult & WL_TIMEOUT)
|
||||
{
|
||||
elog(WARNING, "waiting for metadata sync timed out");
|
||||
}
|
||||
|
||||
CloseConnection(connection);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -359,8 +359,8 @@ StartRemoteTransactionAbort(MultiConnection *connection)
|
|||
* [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always
|
||||
* want to wait for that result, as that shouldn't take long and will
|
||||
* reserve resources. But if there's another query running, we don't want
|
||||
* to wait, because a longrunning statement may be running, force it to be
|
||||
* killed in that case.
|
||||
* to wait, because a long running statement may be running, so force it to
|
||||
* be killed in that case.
|
||||
*/
|
||||
if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
|
||||
transaction->transactionState == REMOTE_TRANS_PREPARED)
|
||||
|
@ -619,7 +619,7 @@ RemoteTransactionsBeginIfNecessary(List *connectionList)
|
|||
|
||||
/*
|
||||
* If a transaction already is in progress (including having failed),
|
||||
* don't start it again. Thats quite normal if a piece of code allows
|
||||
* don't start it again. That's quite normal if a piece of code allows
|
||||
* cached connections.
|
||||
*/
|
||||
if (transaction->transactionState != REMOTE_TRANS_INVALID)
|
||||
|
@ -708,7 +708,7 @@ HandleRemoteTransactionResultError(MultiConnection *connection, PGresult *result
|
|||
* If the connection is marked as critical, and allowErrorPromotion is true,
|
||||
* this routine will ERROR out. The allowErrorPromotion case is primarily
|
||||
* required for the transaction management code itself. Usually it is helpful
|
||||
* to fail as soon as possible. If !allowErrorPromotion transaction commit
|
||||
* to fail as soon as possible. If !allowErrorPromotion transaction commit
|
||||
* will instead issue an error before committing on any node.
|
||||
*/
|
||||
void
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "catalog/pg_extension.h"
|
||||
#include "citus_version.h"
|
||||
#include "catalog/pg_namespace.h"
|
||||
#include "commands/async.h"
|
||||
#include "commands/extension.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "catalog/namespace.h"
|
||||
|
@ -33,11 +34,13 @@
|
|||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/statistics_collection.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/proc.h"
|
||||
|
@ -48,7 +51,6 @@
|
|||
#include "utils/memutils.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
|
||||
/*
|
||||
* Shared memory data for all maintenance workers.
|
||||
*/
|
||||
|
@ -77,6 +79,7 @@ typedef struct MaintenanceDaemonDBData
|
|||
Oid userOid;
|
||||
bool daemonStarted;
|
||||
pid_t workerPid;
|
||||
bool triggerMetadataSync;
|
||||
Latch *latch; /* pointer to the background worker's latch */
|
||||
} MaintenanceDaemonDBData;
|
||||
|
||||
|
@ -84,6 +87,10 @@ typedef struct MaintenanceDaemonDBData
|
|||
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
|
||||
int Recover2PCInterval = 60000;
|
||||
|
||||
/* config variables for metadata sync timeout */
|
||||
int MetadataSyncInterval = 60000;
|
||||
int MetadataSyncRetryInterval = 5000;
|
||||
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
|
||||
|
||||
|
@ -100,6 +107,7 @@ static size_t MaintenanceDaemonShmemSize(void);
|
|||
static void MaintenanceDaemonShmemInit(void);
|
||||
static void MaintenanceDaemonErrorContext(void *arg);
|
||||
static bool LockCitusExtension(void);
|
||||
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -185,6 +193,7 @@ InitializeMaintenanceDaemonBackend(void)
|
|||
|
||||
dbData->daemonStarted = true;
|
||||
dbData->workerPid = 0;
|
||||
dbData->triggerMetadataSync = false;
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
|
||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||
|
@ -225,6 +234,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||
ErrorContextCallback errorCallback;
|
||||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
|
||||
/*
|
||||
* Look up this worker's configuration.
|
||||
|
@ -356,6 +366,36 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
}
|
||||
#endif
|
||||
|
||||
if (MetadataSyncTriggeredCheckAndReset(myDbData) ||
|
||||
GetCurrentTimestamp() >= nextMetadataSyncTime)
|
||||
{
|
||||
bool metadataSyncFailed = false;
|
||||
int64 nextTimeout = 0;
|
||||
|
||||
InvalidateMetadataSystemCache();
|
||||
StartTransactionCommand();
|
||||
|
||||
if (!LockCitusExtension())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||
"skipping metadata sync")));
|
||||
}
|
||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||
{
|
||||
metadataSyncFailed = !SyncMetadataToNodes();
|
||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||
}
|
||||
|
||||
CommitTransactionCommand();
|
||||
ProcessCompletedNotifies();
|
||||
|
||||
nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval :
|
||||
MetadataSyncInterval;
|
||||
nextMetadataSyncTime =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
|
||||
timeout = Min(timeout, nextTimeout);
|
||||
}
|
||||
|
||||
/*
|
||||
* If enabled, run 2PC recovery on primary nodes (where !RecoveryInProgress()),
|
||||
* since we'll write to the pg_dist_transaction log.
|
||||
|
@ -466,6 +506,14 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
/* check for changed configuration */
|
||||
if (myDbData->userOid != GetSessionUserId())
|
||||
{
|
||||
/*
|
||||
* Reset myDbData->daemonStarted so InitializeMaintenanceDaemonBackend()
|
||||
* notices this is a restart.
|
||||
*/
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
myDbData->daemonStarted = false;
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
|
||||
/* return code of 1 requests worker restart */
|
||||
proc_exit(1);
|
||||
}
|
||||
|
@ -657,3 +705,49 @@ StopMaintenanceDaemon(Oid databaseId)
|
|||
kill(workerPid, SIGTERM);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TriggerMetadataSync triggers the maintenance daemon to do a metadata sync for
|
||||
* the given database.
|
||||
*/
|
||||
void
|
||||
TriggerMetadataSync(Oid databaseId)
|
||||
{
|
||||
bool found = false;
|
||||
MaintenanceDaemonDBData *dbData = NULL;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash,
|
||||
&databaseId, HASH_FIND, &found);
|
||||
if (found)
|
||||
{
|
||||
dbData->triggerMetadataSync = true;
|
||||
|
||||
/* set latch to wake-up the maintenance loop */
|
||||
SetLatch(dbData->latch);
|
||||
}
|
||||
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MetadataSyncTriggeredCheckAndReset checks if metadata sync has been
|
||||
* triggered for the given database, and resets the flag.
|
||||
*/
|
||||
static bool
|
||||
MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
|
||||
{
|
||||
bool metadataSyncTriggered = false;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
metadataSyncTriggered = dbData->triggerMetadataSync;
|
||||
dbData->triggerMetadataSync = false;
|
||||
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
|
||||
return metadataSyncTriggered;
|
||||
}
|
||||
|
|
|
@ -581,7 +581,10 @@ LookupNodeByNodeId(uint32 nodeId)
|
|||
WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex];
|
||||
if (workerNode->nodeId == nodeId)
|
||||
{
|
||||
return workerNode;
|
||||
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
|
||||
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
|
||||
|
||||
return workerNodeCopy;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2858,6 +2861,7 @@ InitializeWorkerNodeCache(void)
|
|||
workerNode->nodeId = currentNode->nodeId;
|
||||
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
|
||||
workerNode->hasMetadata = currentNode->hasMetadata;
|
||||
workerNode->metadataSynced = currentNode->metadataSynced;
|
||||
workerNode->isActive = currentNode->isActive;
|
||||
workerNode->nodeRole = currentNode->nodeRole;
|
||||
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "distributed/citus_acquire_lock.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -31,6 +32,7 @@
|
|||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
@ -58,12 +60,16 @@ typedef struct NodeMetadata
|
|||
int32 groupId;
|
||||
char *nodeRack;
|
||||
bool hasMetadata;
|
||||
bool metadataSynced;
|
||||
bool isActive;
|
||||
Oid nodeRole;
|
||||
char *nodeCluster;
|
||||
} NodeMetadata;
|
||||
|
||||
/* local function forward declarations */
|
||||
static List * WorkerListDelete(List *workerList, uint32 nodeId);
|
||||
static List * SyncedMetadataNodeList(void);
|
||||
static void SyncDistNodeEntryToNodes(WorkerNode *workerNode, List *metadataWorkers);
|
||||
static int ActivateNode(char *nodeName, int nodePort);
|
||||
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
||||
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
|
||||
|
@ -78,6 +84,9 @@ static void DeleteNodeRow(char *nodename, int32 nodeport);
|
|||
static List * ParseWorkerNodeFileAndRename(void);
|
||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
||||
static bool * SendOptionalCommandListToWorkers(List *workerNodeList,
|
||||
List *commandList,
|
||||
const char *nodeUser);
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(master_add_node);
|
||||
|
@ -493,7 +502,7 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
/*
|
||||
* force is used when an update needs to happen regardless of conflicting locks. This
|
||||
* feature is important to force the update during a failover due to failure, eg. by
|
||||
* a high-availability system such as pg_auto_failover. The strategy is a to start a
|
||||
* a high-availability system such as pg_auto_failover. The strategy is to start a
|
||||
* background worker that actively cancels backends holding conflicting locks with
|
||||
* this backend.
|
||||
*
|
||||
|
@ -506,6 +515,7 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
WorkerNode *workerNode = NULL;
|
||||
WorkerNode *workerNodeWithSameAddress = NULL;
|
||||
List *placementList = NIL;
|
||||
List *metadataWorkersToSync = NIL;
|
||||
BackgroundWorkerHandle *handle = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
@ -578,6 +588,30 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
|
||||
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
|
||||
|
||||
strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH);
|
||||
workerNode->workerPort = newNodePort;
|
||||
|
||||
/*
|
||||
* Propagate the updated pg_dist_node entry to all metadata workers.
|
||||
* The usual case is that the new node is in stand-by mode, so we don't
|
||||
* sync the changes to the new node, and instead schedule it for being
|
||||
* synced by the maintenance daemon.
|
||||
*
|
||||
* It is possible that maintenance daemon does the first resync too
|
||||
* early, but that's fine, since this will start a retry loop with
|
||||
* 5 second intervals until sync is complete.
|
||||
*/
|
||||
metadataWorkersToSync = SyncedMetadataNodeList();
|
||||
if (workerNode->hasMetadata)
|
||||
{
|
||||
metadataWorkersToSync = WorkerListDelete(metadataWorkersToSync, nodeId);
|
||||
MarkNodeMetadataSynced(workerNode->workerName,
|
||||
workerNode->workerPort, false);
|
||||
TriggerMetadataSync(MyDatabaseId);
|
||||
}
|
||||
|
||||
SyncDistNodeEntryToNodes(workerNode, metadataWorkersToSync);
|
||||
|
||||
if (handle != NULL)
|
||||
{
|
||||
/*
|
||||
|
@ -591,6 +625,176 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/* MetadataNodeList returns list of all synced metadata workers. */
|
||||
static List *
|
||||
SyncedMetadataNodeList(void)
|
||||
{
|
||||
List *metadataNodeList = NIL;
|
||||
List *activePrimaries = ActivePrimaryNodeList(AccessShareLock);
|
||||
ListCell *workerNodeCell = NULL;
|
||||
|
||||
foreach(workerNodeCell, activePrimaries)
|
||||
{
|
||||
WorkerNode *workerNode = lfirst(workerNodeCell);
|
||||
if (workerNode->hasMetadata && workerNode->metadataSynced)
|
||||
{
|
||||
metadataNodeList = lappend(metadataNodeList, workerNode);
|
||||
}
|
||||
}
|
||||
|
||||
return metadataNodeList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WorkerListDelete removes the worker node with the given id
|
||||
* from the list of given workers.
|
||||
*/
|
||||
static List *
|
||||
WorkerListDelete(List *workerList, uint32 nodeId)
|
||||
{
|
||||
List *filteredWorkerList = NIL;
|
||||
ListCell *workerCell = NULL;
|
||||
|
||||
foreach(workerCell, workerList)
|
||||
{
|
||||
WorkerNode *workerNode = lfirst(workerCell);
|
||||
if (workerNode->nodeId != nodeId)
|
||||
{
|
||||
filteredWorkerList = lappend(filteredWorkerList, workerNode);
|
||||
}
|
||||
}
|
||||
|
||||
return filteredWorkerList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncDistNodeEntryToNodes synchronizes the corresponding entry for
|
||||
* the given workerNode in pg_dist_node metadata table of given
|
||||
* metadataWorkers. If syncing to a node fails, pg_distnode.metadatasynced
|
||||
* is set to false.
|
||||
*/
|
||||
static void
|
||||
SyncDistNodeEntryToNodes(WorkerNode *nodeToSync, List *metadataWorkers)
|
||||
{
|
||||
ListCell *workerCell = NULL;
|
||||
char *extensionOwner = CitusExtensionOwnerName();
|
||||
char *nodeDeleteCommand = NodeDeleteCommand(nodeToSync->nodeId);
|
||||
char *nodeInsertCommand = NodeListInsertCommand(list_make1(nodeToSync));
|
||||
List *commandList = list_make2(nodeDeleteCommand, nodeInsertCommand);
|
||||
bool *workerFailed = SendOptionalCommandListToWorkers(metadataWorkers, commandList,
|
||||
extensionOwner);
|
||||
|
||||
int workerIndex = 0;
|
||||
foreach(workerCell, metadataWorkers)
|
||||
{
|
||||
WorkerNode *workerNode = lfirst(workerCell);
|
||||
|
||||
if (workerFailed[workerIndex])
|
||||
{
|
||||
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, false);
|
||||
}
|
||||
|
||||
workerIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendOptionalCommandListToWorkers sends the given command list to the given
|
||||
* worker list, and returns a bool[] indicating whether execution of the command
|
||||
* list failed at a worker or not.
|
||||
*
|
||||
* If execution fails at a worker because of connection error, this function just
|
||||
* emits a warning for that node. If execution fails because of a result error,
|
||||
* the node is rolled-back to the status before this function so further queries
|
||||
* can be sent to the node.
|
||||
*/
|
||||
static bool *
|
||||
SendOptionalCommandListToWorkers(List *workerNodeList, List *commandList,
|
||||
const char *nodeUser)
|
||||
{
|
||||
List *connectionList = NIL;
|
||||
ListCell *commandCell = NULL;
|
||||
ListCell *connectionCell = NULL;
|
||||
bool *workerFailed = palloc0(sizeof(workerNodeList));
|
||||
char *beginSavepointCommand = "SAVEPOINT sp_node_metadata";
|
||||
char *rollbackSavepointCommand = "ROLLBACK TO SAVEPOINT sp_node_metadata";
|
||||
char *releaseSavepointCommand = "RELEASE SAVEPOINT sp_node_metadata";
|
||||
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
|
||||
/* open connections in parallel */
|
||||
connectionList = StartWorkerListConnections(workerNodeList, 0, nodeUser, NULL);
|
||||
FinishConnectionListEstablishment(connectionList);
|
||||
|
||||
RemoteTransactionsBeginIfNecessary(connectionList);
|
||||
|
||||
commandList = lcons(beginSavepointCommand, commandList);
|
||||
commandList = lappend(commandList, releaseSavepointCommand);
|
||||
|
||||
foreach(commandCell, commandList)
|
||||
{
|
||||
char *command = lfirst(commandCell);
|
||||
int workerIndex = 0;
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
MultiConnection *connection = lfirst(connectionCell);
|
||||
|
||||
if (!workerFailed[workerIndex])
|
||||
{
|
||||
if (SendRemoteCommand(connection, command) == 0)
|
||||
{
|
||||
ReportConnectionError(connection, WARNING);
|
||||
workerFailed[workerIndex] = true;
|
||||
}
|
||||
}
|
||||
|
||||
workerIndex++;
|
||||
}
|
||||
|
||||
workerIndex = 0;
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
MultiConnection *connection = lfirst(connectionCell);
|
||||
bool raiseInterrupts = true;
|
||||
PGresult *commandResult = NULL;
|
||||
bool responseOK = false;
|
||||
|
||||
if (workerFailed[workerIndex])
|
||||
{
|
||||
workerIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
commandResult = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
responseOK = IsResponseOK(commandResult);
|
||||
|
||||
if (!responseOK)
|
||||
{
|
||||
ReportResultError(connection, commandResult, WARNING);
|
||||
workerFailed[workerIndex] = true;
|
||||
|
||||
PQclear(commandResult);
|
||||
ForgetResults(connection);
|
||||
|
||||
ExecuteOptionalRemoteCommand(connection, rollbackSavepointCommand, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
PQclear(commandResult);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
workerIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
return workerFailed;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
|
||||
{
|
||||
|
@ -1255,6 +1459,8 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
|||
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
|
||||
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack);
|
||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata);
|
||||
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(
|
||||
nodeMetadata->metadataSynced);
|
||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
|
||||
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
|
||||
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
||||
|
@ -1463,6 +1669,7 @@ ParseWorkerNodeFileAndRename()
|
|||
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
|
||||
workerNode->workerPort = nodePort;
|
||||
workerNode->hasMetadata = false;
|
||||
workerNode->metadataSynced = false;
|
||||
workerNode->isActive = true;
|
||||
|
||||
workerNodeList = lappend(workerNodeList, workerNode);
|
||||
|
@ -1519,6 +1726,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
|||
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
|
||||
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
|
||||
workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);
|
||||
workerNode->metadataSynced =
|
||||
DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
|
||||
workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
|
||||
workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
|
||||
|
||||
|
|
|
@ -177,8 +177,8 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
|
|||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
|
||||
|
||||
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort,
|
||||
tableOwner, commandList);
|
||||
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
|
||||
commandList);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -182,6 +182,8 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
|
|||
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
|
||||
int32 port, const char *user, const
|
||||
char *database);
|
||||
extern List * StartWorkerListConnections(List *workerList, uint32 flags, const char *user,
|
||||
const char *database);
|
||||
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||
const char *hostname,
|
||||
int32 port,
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
extern double DistributedDeadlockDetectionTimeoutFactor;
|
||||
|
||||
extern void StopMaintenanceDaemon(Oid databaseId);
|
||||
extern void TriggerMetadataSync(Oid databaseId);
|
||||
extern void InitializeMaintenanceDaemon(void);
|
||||
extern void InitializeMaintenanceDaemonBackend(void);
|
||||
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
/* config variables */
|
||||
extern int MetadataSyncInterval;
|
||||
extern int MetadataSyncRetryInterval;
|
||||
|
||||
/* Functions declarations for metadata syncing */
|
||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||
|
@ -37,7 +40,11 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
|
|||
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
|
||||
uint64 shardLength, int32 groupId);
|
||||
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||
|
||||
extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced);
|
||||
extern bool SyncMetadataToNodes(void);
|
||||
extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
|
||||
char *nodeUser,
|
||||
List *commandList);
|
||||
|
||||
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
|
||||
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
|
||||
|
@ -57,5 +64,6 @@ extern void CreateTableMetadataOnWorkers(Oid relationId);
|
|||
"shardstate = EXCLUDED.shardstate, " \
|
||||
"shardlength = EXCLUDED.shardlength, " \
|
||||
"groupid = EXCLUDED.groupid"
|
||||
#define METADATA_SYNC_CHANNEL "metadata_sync"
|
||||
|
||||
#endif /* METADATA_SYNC_H */
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
* in particular their OUT parameters) must be changed whenever the definition of
|
||||
* pg_dist_node changes.
|
||||
*/
|
||||
#define Natts_pg_dist_node 9
|
||||
#define Natts_pg_dist_node 10
|
||||
#define Anum_pg_dist_node_nodeid 1
|
||||
#define Anum_pg_dist_node_groupid 2
|
||||
#define Anum_pg_dist_node_nodename 3
|
||||
|
@ -30,6 +30,7 @@
|
|||
#define Anum_pg_dist_node_isactive 7
|
||||
#define Anum_pg_dist_node_noderole 8
|
||||
#define Anum_pg_dist_node_nodecluster 9
|
||||
#define Anum_pg_dist_node_metadatasynced 10
|
||||
|
||||
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
|
||||
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"
|
||||
|
|
|
@ -47,6 +47,7 @@ typedef struct WorkerNode
|
|||
bool isActive; /* node's state */
|
||||
Oid nodeRole; /* the node's role in its group */
|
||||
char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */
|
||||
bool metadataSynced; /* node has the most recent metadata */
|
||||
} WorkerNode;
|
||||
|
||||
|
||||
|
|
|
@ -29,11 +29,11 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
264 263 f
|
||||
274 273 f
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
263
|
||||
264 263
|
||||
273
|
||||
274 273
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
268 267 f
|
||||
269 267 f
|
||||
269 268 t
|
||||
278 277 f
|
||||
279 277 f
|
||||
279 278 t
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
267
|
||||
268 267
|
||||
269 267,268
|
||||
277
|
||||
278 277
|
||||
279 277,278
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
|
|
@ -29,11 +29,11 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
265 264 f
|
||||
275 274 f
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
264
|
||||
265 264
|
||||
274
|
||||
275 274
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
269 268 f
|
||||
270 268 f
|
||||
270 269 t
|
||||
279 278 f
|
||||
280 278 f
|
||||
280 279 t
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
268
|
||||
269 268
|
||||
270 268,269
|
||||
278
|
||||
279 278
|
||||
280 278,279
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
|
|
@ -86,3 +86,47 @@ nodeid nodename nodeport isactive
|
|||
24 localhost 58637 t
|
||||
nodeid nodename nodeport
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-update-node-1 s2-start-metadata-sync-node-2 s1-commit s2-verify-metadata
|
||||
nodeid nodename nodeport
|
||||
|
||||
26 localhost 57637
|
||||
27 localhost 57638
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-update-node-1:
|
||||
SELECT 1 FROM master_update_node(
|
||||
(select nodeid from pg_dist_node where nodeport = 57637),
|
||||
'localhost',
|
||||
58637);
|
||||
|
||||
?column?
|
||||
|
||||
1
|
||||
step s2-start-metadata-sync-node-2:
|
||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-start-metadata-sync-node-2: <... completed>
|
||||
start_metadata_sync_to_node
|
||||
|
||||
|
||||
step s2-verify-metadata:
|
||||
SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid;
|
||||
SELECT master_run_on_worker(
|
||||
ARRAY['localhost'], ARRAY[57638],
|
||||
ARRAY['SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport) ORDER BY nodeid) FROM pg_dist_node'],
|
||||
false);
|
||||
|
||||
nodeid groupid nodename nodeport
|
||||
|
||||
26 26 localhost 58637
|
||||
27 27 localhost 57638
|
||||
master_run_on_worker
|
||||
|
||||
(localhost,57638,t,"[{""f1"": 26, ""f2"": 26, ""f3"": ""localhost"", ""f4"": 58637}, {""f1"": 27, ""f2"": 27, ""f3"": ""localhost"", ""f4"": 57638}]")
|
||||
nodeid nodename nodeport
|
||||
|
||||
|
|
|
@ -301,14 +301,19 @@ SELECT master_remove_node('localhost', 9990);
|
|||
|
||||
-- clean-up
|
||||
DROP TABLE cluster_management_test;
|
||||
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
|
||||
-- check that adding/removing nodes are propagated to nodes with metadata
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
----------
|
||||
|
@ -336,8 +341,13 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
|
|||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- check that added nodes are not propagated to nodes with hasmetadata=false
|
||||
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
|
||||
-- check that added nodes are not propagated to nodes without metadata
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
----------
|
||||
|
@ -376,10 +386,10 @@ SELECT
|
|||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
|
||||
11 | 9 | localhost | 57637 | default | f | t | primary | default
|
||||
12 | 10 | localhost | 57638 | default | f | t | primary | default
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------
|
||||
11 | 9 | localhost | 57637 | default | f | t | primary | default | f
|
||||
12 | 10 | localhost | 57638 | default | f | t | primary | default | f
|
||||
(2 rows)
|
||||
|
||||
-- check that mixed add/remove node commands work fine inside transaction
|
||||
|
@ -408,7 +418,12 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
|
|||
----------+----------
|
||||
(0 rows)
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
|
@ -593,11 +608,11 @@ CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 a
|
|||
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster)
|
||||
VALUES ('localhost', 5000, 1000, 'primary', 'olap');
|
||||
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
|
||||
DETAIL: Failing row contains (19, 1000, localhost, 5000, default, f, t, primary, olap).
|
||||
DETAIL: Failing row contains (19, 1000, localhost, 5000, default, f, t, primary, olap, f).
|
||||
UPDATE pg_dist_node SET nodecluster = 'olap'
|
||||
WHERE nodeport = :worker_1_port;
|
||||
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
|
||||
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap).
|
||||
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f).
|
||||
-- check that you /can/ add a secondary node to a non-default cluster
|
||||
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
|
||||
SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap');
|
||||
|
@ -620,9 +635,9 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole =
|
|||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node WHERE nodeport=8887;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------
|
||||
24 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------+----------------
|
||||
24 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f
|
||||
(1 row)
|
||||
|
||||
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
|
||||
|
@ -663,9 +678,9 @@ SELECT master_update_node(:worker_1_node, 'somehost', 9000);
|
|||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
|
||||
16 | 14 | somehost | 9000 | default | f | t | primary | default
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
||||
--------+---------+----------+----------+----------+-------------+----------+----------+-------------+----------------
|
||||
16 | 14 | somehost | 9000 | default | f | t | primary | default | f
|
||||
(1 row)
|
||||
|
||||
-- cleanup
|
||||
|
@ -676,8 +691,8 @@ SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
|
|||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
|
||||
16 | 14 | localhost | 57637 | default | f | t | primary | default
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------
|
||||
16 | 14 | localhost | 57637 | default | f | t | primary | default | f
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -5,11 +5,12 @@ SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
|||
FROM pg_attribute
|
||||
WHERE atthasmissing
|
||||
ORDER BY attrelid, attname;
|
||||
attrelid | attname | atthasmissing | attmissingval
|
||||
--------------+-------------+---------------+---------------
|
||||
pg_dist_node | hasmetadata | t | {f}
|
||||
pg_dist_node | isactive | t | {t}
|
||||
pg_dist_node | nodecluster | t | {default}
|
||||
pg_dist_node | noderole | t | {primary}
|
||||
(4 rows)
|
||||
attrelid | attname | atthasmissing | attmissingval
|
||||
--------------+----------------+---------------+---------------
|
||||
pg_dist_node | hasmetadata | t | {f}
|
||||
pg_dist_node | isactive | t | {t}
|
||||
pg_dist_node | metadatasynced | t | {f}
|
||||
pg_dist_node | nodecluster | t | {default}
|
||||
pg_dist_node | noderole | t | {primary}
|
||||
(5 rows)
|
||||
|
||||
|
|
|
@ -27,9 +27,9 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
|
|||
-- Show that, with no MX tables, metadata snapshot contains only the delete commands,
|
||||
-- pg_dist_node entries and reference tables
|
||||
SELECT unnest(master_metadata_snapshot()) order by 1;
|
||||
unnest
|
||||
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
unnest
|
||||
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
(3 rows)
|
||||
|
@ -60,7 +60,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
|
@ -81,7 +81,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
|
@ -105,7 +105,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
|
@ -133,7 +133,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
|
@ -154,7 +154,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
|
@ -233,12 +233,12 @@ SELECT * FROM pg_dist_local_group;
|
|||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------
|
||||
1 | 1 | localhost | 57637 | default | t | t | primary | default
|
||||
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default
|
||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------
|
||||
1 | 1 | localhost | 57637 | default | t | t | primary | default | f
|
||||
2 | 2 | localhost | 57638 | default | f | t | primary | default | f
|
||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f
|
||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f
|
||||
(4 rows)
|
||||
|
||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||
|
@ -372,12 +372,12 @@ SELECT * FROM pg_dist_local_group;
|
|||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------
|
||||
1 | 1 | localhost | 57637 | default | t | t | primary | default
|
||||
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default
|
||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced
|
||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------
|
||||
1 | 1 | localhost | 57637 | default | t | t | primary | default | t
|
||||
2 | 2 | localhost | 57638 | default | f | t | primary | default | f
|
||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default | f
|
||||
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f
|
||||
(4 rows)
|
||||
|
||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||
|
|
|
@ -0,0 +1,373 @@
|
|||
-- Test creation of mx tables and metadata syncing
|
||||
SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
|
||||
\gset
|
||||
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
|
||||
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
|
||||
SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
|
||||
SELECT nextval('pg_catalog.pg_dist_shardid_seq') AS last_shard_id \gset
|
||||
SET citus.replication_model TO streaming;
|
||||
SET citus.shard_count TO 8;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
|
||||
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
|
||||
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
|
||||
CREATE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
|
||||
RETURNS BOOLEAN
|
||||
LANGUAGE sql
|
||||
AS $$
|
||||
WITH dist_node_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
|
||||
), dist_node_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_node_summary CROSS JOIN LATERAL
|
||||
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
||||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||
false)
|
||||
), dist_placement_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
|
||||
), dist_placement_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
||||
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
|
||||
false)
|
||||
)
|
||||
SELECT dist_node_check.matches AND dist_placement_check.matches
|
||||
FROM dist_node_check CROSS JOIN dist_placement_check
|
||||
$$;
|
||||
-- Simulates a readonly node by setting default_transaction_read_only.
|
||||
CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
|
||||
RETURNS TEXT
|
||||
LANGUAGE sql
|
||||
AS $$
|
||||
SELECT master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
||||
ARRAY['ALTER SYSTEM SET default_transaction_read_only TO ' || isreadonly::TEXT], false);
|
||||
SELECT result FROM
|
||||
master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
||||
ARRAY['SELECT pg_reload_conf()'], false);
|
||||
$$;
|
||||
-- add a node to the cluster
|
||||
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 57637 | f | f
|
||||
(1 row)
|
||||
|
||||
-- create couple of tables
|
||||
CREATE TABLE ref_table(a int primary key);
|
||||
SELECT create_reference_table('ref_table');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a));
|
||||
SELECT create_distributed_table('dist_table_1', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- update the node
|
||||
SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
|
||||
'localhost', :worker_2_port);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 57638 | f | f
|
||||
(1 row)
|
||||
|
||||
-- start syncing metadata to the node
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 57638 | t | t
|
||||
(1 row)
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
-- Test that maintenance daemon syncs after master_update_node
|
||||
--------------------------------------------------------------------------
|
||||
-- Update the node again. We do this as epeatable read, so we just see the
|
||||
-- changes by master_update_node(). This is to avoid inconsistent results
|
||||
-- if the maintenance daemon does the metadata sync too fast.
|
||||
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 57638 | t | t
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 57637 | t | f
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
-- wait until maintenance daemon does the next metadata sync, and then
|
||||
-- check if metadata is synced again
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | hasmetadata | metadatasynced
|
||||
--------+-------------+----------------
|
||||
2 | t | t
|
||||
(1 row)
|
||||
|
||||
SELECT verify_metadata('localhost', :worker_1_port);
|
||||
verify_metadata
|
||||
-----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Update the node to a non-existent node. This is to simulate updating to
|
||||
-- a unwriteable node.
|
||||
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 57637 | t | t
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | nodename | nodeport | hasmetadata | metadatasynced
|
||||
--------+-----------+----------+-------------+----------------
|
||||
2 | localhost | 12345 | t | f
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
-- maintenace daemon metadata sync should fail, because node is still unwriteable.
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | hasmetadata | metadatasynced
|
||||
--------+-------------+----------------
|
||||
2 | t | f
|
||||
(1 row)
|
||||
|
||||
-- update it back to :worker_1_port, now metadata should be synced
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
nodeid | hasmetadata | metadatasynced
|
||||
--------+-------------+----------------
|
||||
2 | t | t
|
||||
(1 row)
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
-- Test updating a node when another node is in readonly-mode
|
||||
--------------------------------------------------------------------------
|
||||
SELECT FROM master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
|
||||
NOTICE: Replicating reference table "ref_table" to the node localhost:57638
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- Create a table with shards on both nodes
|
||||
CREATE TABLE dist_table_2(a int);
|
||||
SELECT create_distributed_table('dist_table_2', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i;
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
|
||||
mark_node_readonly
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Now updating the other node should try syncing to worker 2, but instead of
|
||||
-- failure, it should just warn and mark the readonly node as not synced.
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
|
||||
WARNING: cannot execute DELETE in a read-only transaction
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | hasmetadata | metadatasynced
|
||||
--------+-------------+----------------
|
||||
2 | t | f
|
||||
3 | t | f
|
||||
(2 rows)
|
||||
|
||||
-- worker_2 is out of sync, so further updates aren't sent to it and
|
||||
-- we shouldn't see the warnings.
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | hasmetadata | metadatasynced
|
||||
--------+-------------+----------------
|
||||
2 | t | f
|
||||
3 | t | f
|
||||
(2 rows)
|
||||
|
||||
-- Make the node writeable.
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
|
||||
mark_node_readonly
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Mark the node readonly again, so the following master_update_node warns
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
|
||||
mark_node_readonly
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Revert the nodeport of worker 1, metadata propagation to worker 2 should
|
||||
-- still fail, but after the failure, we should still be able to read from
|
||||
-- worker 2 in the same transaction!
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
WARNING: cannot execute DELETE in a read-only transaction
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM dist_table_2;
|
||||
count
|
||||
-------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Make the node writeable.
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
|
||||
mark_node_readonly
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT verify_metadata('localhost', :worker_1_port),
|
||||
verify_metadata('localhost', :worker_2_port);
|
||||
verify_metadata | verify_metadata
|
||||
-----------------+-----------------
|
||||
t | t
|
||||
(1 row)
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
-- Test that master_update_node rolls back properly
|
||||
--------------------------------------------------------------------------
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
SELECT verify_metadata('localhost', :worker_1_port),
|
||||
verify_metadata('localhost', :worker_2_port);
|
||||
verify_metadata | verify_metadata
|
||||
-----------------+-----------------
|
||||
t | t
|
||||
(1 row)
|
||||
|
||||
-- cleanup
|
||||
DROP TABLE dist_table_1, ref_table, dist_table_2;
|
||||
TRUNCATE pg_dist_colocation;
|
||||
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART :last_shard_id;
|
||||
RESET citus.shard_count;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
|
@ -23,11 +23,11 @@ INSERT INTO dest_table (a, b) VALUES (1, 1);
|
|||
INSERT INTO dest_table (a, b) VALUES (2, 1);
|
||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||
-- simluate actually having secondary nodes
|
||||
SELECT * FROM pg_dist_node;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
|
||||
1 | 1 | localhost | 57637 | default | f | t | primary | default
|
||||
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||
nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster
|
||||
--------+---------+-----------+----------+----------+----------+----------+-------------
|
||||
1 | 1 | localhost | 57637 | default | t | primary | default
|
||||
2 | 2 | localhost | 57638 | default | t | primary | default
|
||||
(2 rows)
|
||||
|
||||
UPDATE pg_dist_node SET noderole = 'secondary';
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# Tests around schema changes, these are run first, so there's no preexisting objects.
|
||||
# ---
|
||||
test: multi_extension
|
||||
test: multi_mx_master_update_node
|
||||
test: multi_cluster_management
|
||||
test: multi_test_helpers
|
||||
|
||||
|
|
|
@ -70,6 +70,20 @@ step "s2-update-node-2"
|
|||
58638);
|
||||
}
|
||||
|
||||
step "s2-verify-metadata"
|
||||
{
|
||||
SELECT nodeid, groupid, nodename, nodeport FROM pg_dist_node ORDER BY nodeid;
|
||||
SELECT master_run_on_worker(
|
||||
ARRAY['localhost'], ARRAY[57638],
|
||||
ARRAY['SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport) ORDER BY nodeid) FROM pg_dist_node'],
|
||||
false);
|
||||
}
|
||||
|
||||
step "s2-start-metadata-sync-node-2"
|
||||
{
|
||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "s2-abort"
|
||||
{
|
||||
ABORT;
|
||||
|
@ -85,3 +99,8 @@ permutation "s1-begin" "s1-update-node-1" "s2-update-node-2" "s1-commit" "s1-sho
|
|||
|
||||
# sessions 1 updates node 1, session 2 tries to do the same
|
||||
permutation "s1-begin" "s1-update-node-1" "s2-begin" "s2-update-node-1" "s1-commit" "s2-abort" "s1-show-nodes"
|
||||
|
||||
# master_update_node should block start_metadata_sync_to_node. Note that we
|
||||
# cannot run start_metadata_sync_to_node in a transaction, so we're not
|
||||
# testing the reverse order here.
|
||||
permutation "s1-begin" "s1-update-node-1" "s2-start-metadata-sync-node-2" "s1-commit" "s2-verify-metadata"
|
||||
|
|
|
@ -134,9 +134,9 @@ SELECT master_remove_node('localhost', 9990);
|
|||
-- clean-up
|
||||
DROP TABLE cluster_management_test;
|
||||
|
||||
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
|
||||
-- check that adding/removing nodes are propagated to nodes with metadata
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
|
@ -146,8 +146,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
|||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
\c - - - :master_port
|
||||
|
||||
-- check that added nodes are not propagated to nodes with hasmetadata=false
|
||||
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
|
||||
-- check that added nodes are not propagated to nodes without metadata
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
|
@ -174,7 +174,7 @@ COMMIT;
|
|||
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
-- Test creation of mx tables and metadata syncing
|
||||
|
||||
SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
|
||||
\gset
|
||||
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
|
||||
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
|
||||
SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
|
||||
SELECT nextval('pg_catalog.pg_dist_shardid_seq') AS last_shard_id \gset
|
||||
|
||||
|
||||
SET citus.replication_model TO streaming;
|
||||
SET citus.shard_count TO 8;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
|
||||
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
|
||||
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
CREATE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
|
||||
CREATE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
|
||||
RETURNS BOOLEAN
|
||||
LANGUAGE sql
|
||||
AS $$
|
||||
WITH dist_node_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
|
||||
), dist_node_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_node_summary CROSS JOIN LATERAL
|
||||
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
||||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||
false)
|
||||
), dist_placement_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
|
||||
), dist_placement_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
||||
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
|
||||
false)
|
||||
)
|
||||
SELECT dist_node_check.matches AND dist_placement_check.matches
|
||||
FROM dist_node_check CROSS JOIN dist_placement_check
|
||||
$$;
|
||||
|
||||
-- Simulates a readonly node by setting default_transaction_read_only.
|
||||
CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
|
||||
RETURNS TEXT
|
||||
LANGUAGE sql
|
||||
AS $$
|
||||
SELECT master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
||||
ARRAY['ALTER SYSTEM SET default_transaction_read_only TO ' || isreadonly::TEXT], false);
|
||||
SELECT result FROM
|
||||
master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
||||
ARRAY['SELECT pg_reload_conf()'], false);
|
||||
$$;
|
||||
|
||||
-- add a node to the cluster
|
||||
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
-- create couple of tables
|
||||
CREATE TABLE ref_table(a int primary key);
|
||||
SELECT create_reference_table('ref_table');
|
||||
|
||||
CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a));
|
||||
SELECT create_distributed_table('dist_table_1', 'a');
|
||||
|
||||
-- update the node
|
||||
SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
|
||||
'localhost', :worker_2_port);
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
-- start syncing metadata to the node
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
-- Test that maintenance daemon syncs after master_update_node
|
||||
--------------------------------------------------------------------------
|
||||
|
||||
-- Update the node again. We do this as epeatable read, so we just see the
|
||||
-- changes by master_update_node(). This is to avoid inconsistent results
|
||||
-- if the maintenance daemon does the metadata sync too fast.
|
||||
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
END;
|
||||
|
||||
-- wait until maintenance daemon does the next metadata sync, and then
|
||||
-- check if metadata is synced again
|
||||
SELECT wait_until_metadata_sync();
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
SELECT verify_metadata('localhost', :worker_1_port);
|
||||
|
||||
-- Update the node to a non-existent node. This is to simulate updating to
|
||||
-- a unwriteable node.
|
||||
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
END;
|
||||
|
||||
-- maintenace daemon metadata sync should fail, because node is still unwriteable.
|
||||
SELECT wait_until_metadata_sync();
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
-- update it back to :worker_1_port, now metadata should be synced
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
SELECT wait_until_metadata_sync();
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
-- Test updating a node when another node is in readonly-mode
|
||||
--------------------------------------------------------------------------
|
||||
|
||||
SELECT FROM master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
-- Create a table with shards on both nodes
|
||||
CREATE TABLE dist_table_2(a int);
|
||||
SELECT create_distributed_table('dist_table_2', 'a');
|
||||
INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i;
|
||||
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
|
||||
|
||||
-- Now updating the other node should try syncing to worker 2, but instead of
|
||||
-- failure, it should just warn and mark the readonly node as not synced.
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
|
||||
|
||||
-- worker_2 is out of sync, so further updates aren't sent to it and
|
||||
-- we shouldn't see the warnings.
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456);
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
|
||||
|
||||
-- Make the node writeable.
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
|
||||
SELECT wait_until_metadata_sync();
|
||||
|
||||
-- Mark the node readonly again, so the following master_update_node warns
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
|
||||
|
||||
-- Revert the nodeport of worker 1, metadata propagation to worker 2 should
|
||||
-- still fail, but after the failure, we should still be able to read from
|
||||
-- worker 2 in the same transaction!
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
SELECT count(*) FROM dist_table_2;
|
||||
END;
|
||||
|
||||
SELECT wait_until_metadata_sync();
|
||||
|
||||
-- Make the node writeable.
|
||||
SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
|
||||
SELECT wait_until_metadata_sync();
|
||||
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
SELECT verify_metadata('localhost', :worker_1_port),
|
||||
verify_metadata('localhost', :worker_2_port);
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
-- Test that master_update_node rolls back properly
|
||||
--------------------------------------------------------------------------
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
|
||||
ROLLBACK;
|
||||
|
||||
SELECT verify_metadata('localhost', :worker_1_port),
|
||||
verify_metadata('localhost', :worker_2_port);
|
||||
|
||||
-- cleanup
|
||||
DROP TABLE dist_table_1, ref_table, dist_table_2;
|
||||
TRUNCATE pg_dist_colocation;
|
||||
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART :last_shard_id;
|
||||
|
||||
RESET citus.shard_count;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
|
@ -19,7 +19,7 @@ INSERT INTO dest_table (a, b) VALUES (2, 1);
|
|||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||
|
||||
-- simluate actually having secondary nodes
|
||||
SELECT * FROM pg_dist_node;
|
||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||
UPDATE pg_dist_node SET noderole = 'secondary';
|
||||
|
||||
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"
|
||||
|
|
Loading…
Reference in New Issue