Fix node adding itself with citus_add_node leading to deadlock (Fix #5720) (#5758)

If a worker node is being added, a command is sent to get the server_id of the worker from the pg_dist_node_metadata table. If the worker's id is the same as the node executing the code, we will know the node is trying to add itself. If the node tries to add itself without specifying `groupid:=0` the operation will result in an error.
pull/5776/head^2
Gledis Zeneli 2022-03-10 17:46:33 +03:00 committed by GitHub
parent 2a7a5da526
commit 2cb02bfb56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 237 additions and 67 deletions

View File

@ -169,6 +169,7 @@ typedef struct MetadataCacheData
Oid citusTableIsVisibleFuncId;
Oid relationIsAKnownShardFuncId;
Oid jsonbExtractPathFuncId;
Oid jsonbExtractPathTextFuncId;
bool databaseNameValid;
char databaseName[NAMEDATALEN];
} MetadataCacheData;
@ -2726,6 +2727,24 @@ JsonbExtractPathFuncId(void)
}
/*
* JsonbExtractPathTextFuncId returns oid of the jsonb_extract_path_text function.
*/
Oid
JsonbExtractPathTextFuncId(void)
{
if (MetadataCache.jsonbExtractPathTextFuncId == InvalidOid)
{
const int argCount = 2;
MetadataCache.jsonbExtractPathTextFuncId =
FunctionOid("pg_catalog", "jsonb_extract_path_text", argCount);
}
return MetadataCache.jsonbExtractPathTextFuncId;
}
/*
* CurrentDatabaseName gets the name of the current database and caches
* the result.

View File

@ -275,6 +275,23 @@ citus_add_node(PG_FUNCTION_ARGS)
*/
if (!nodeAlreadyExists)
{
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort);
/*
* If the worker is not marked as a coordinator, check that
* the node is not trying to add itself
*/
if (workerNode != NULL &&
workerNode->groupId != COORDINATOR_GROUP_ID &&
IsWorkerTheCurrentNode(workerNode))
{
ereport(ERROR, (errmsg("Node cannot add itself as a worker."),
errhint(
"Add the node as a coordinator by using: "
"SELECT citus_set_coordinator_host('%s', %d);",
nodeNameString, nodePort)));
}
ActivateNode(nodeNameString, nodePort);
}

View File

@ -46,9 +46,9 @@
#include "distributed/placement_connection.h"
#include "distributed/tuple_destination.h"
#include "distributed/tuplestore.h"
#include "distributed/listutils.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "distributed/jsonbutils.h"
#include "executor/tstoreReceiver.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
@ -143,10 +143,8 @@ static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
QueryEnvironment *queryEnv,
const instr_time *planduration,
double *executionDurationMillisec);
static bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue);
static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName,
ExplainFormat defaultValue);
static bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result);
static TupleDestination * CreateExplainAnlyzeDestination(Task *task,
TupleDestination *taskDest);
static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
@ -1112,25 +1110,6 @@ FreeSavedExplainPlan(void)
}
/*
* ExtractFieldBoolean gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
*/
static bool
ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonbDatum(jsonbDoc, fieldName, &jsonbDatum);
if (!found)
{
return defaultValue;
}
Datum boolDatum = DirectFunctionCall1(jsonb_bool, jsonbDatum);
return DatumGetBool(boolDatum);
}
/*
* ExtractFieldExplainFormat gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
@ -1169,50 +1148,6 @@ ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat
}
/*
* ExtractFieldJsonbDatum gets value of fieldName from jsonbDoc and puts it
* into result. If not found, returns false. Otherwise, returns true.
*/
static bool
ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result)
{
Datum pathArray[1] = { CStringGetTextDatum(fieldName) };
bool pathNulls[1] = { false };
bool typeByValue = false;
char typeAlignment = 0;
int16 typeLength = 0;
int dimensions[1] = { 1 };
int lowerbounds[1] = { 1 };
get_typlenbyvalalign(TEXTOID, &typeLength, &typeByValue, &typeAlignment);
ArrayType *pathArrayObject = construct_md_array(pathArray, pathNulls, 1, dimensions,
lowerbounds, TEXTOID, typeLength,
typeByValue, typeAlignment);
Datum pathDatum = PointerGetDatum(pathArrayObject);
/*
* We need to check whether the result of jsonb_extract_path is NULL or not, so use
* FunctionCallInvoke() instead of other function call api.
*
* We cannot use jsonb_path_exists to ensure not-null since it is not available in
* postgres 11.
*/
FmgrInfo fmgrInfo;
fmgr_info(JsonbExtractPathFuncId(), &fmgrInfo);
LOCAL_FCINFO(functionCallInfo, 2);
InitFunctionCallInfoData(*functionCallInfo, &fmgrInfo, 2, DEFAULT_COLLATION_OID, NULL,
NULL);
fcSetArg(functionCallInfo, 0, jsonbDoc);
fcSetArg(functionCallInfo, 1, pathDatum);
*result = FunctionCallInvoke(functionCallInfo);
return !functionCallInfo->isnull;
}
/*
* CitusExplainOneQuery is the executor hook that is called when
* postgres wants to explain a query.

View File

@ -30,8 +30,9 @@
#include "distributed/transaction_recovery.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "distributed/jsonbutils.h"
#include "utils/memutils.h"
#include "utils/builtins.h"
static void SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount,
@ -639,3 +640,58 @@ ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList)
}
}
}
/*
* IsWorkerTheCurrentNode checks if the given worker refers to the
* the current node by comparing the server id of the worker and of the
* current nodefrom pg_dist_node_metadata
*/
bool
IsWorkerTheCurrentNode(WorkerNode *workerNode)
{
int connectionFlags = REQUIRE_METADATA_CONNECTION;
MultiConnection *workerConnection =
GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
NULL);
const char *command =
"SELECT metadata ->> 'server_id' AS server_id FROM pg_dist_node_metadata";
SendRemoteCommand(workerConnection, command);
PGresult *result = GetRemoteCommandResult(workerConnection, true);
if (result == NULL)
{
return false;
}
List *commandResult = ReadFirstColumnAsText(result);
PQclear(result);
ForgetResults(workerConnection);
if ((list_length(commandResult) != 1))
{
return false;
}
StringInfo resultInfo = (StringInfo) linitial(commandResult);
char *workerServerId = resultInfo->data;
Datum metadata = DistNodeMetadata();
text *currentServerIdTextP = ExtractFieldTextP(metadata, "server_id");
if (currentServerIdTextP == NULL)
{
return false;
}
char *currentServerId = text_to_cstring(currentServerIdTextP);
return strcmp(workerServerId, currentServerId) == 0;
}

View File

@ -0,0 +1,113 @@
#include "postgres.h"
#include "pg_version_compat.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/json.h"
#include "distributed/jsonbutils.h"
#include "distributed/metadata_cache.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "fmgr.h"
/*
* ExtractFieldJsonb gets value of fieldName from jsonbDoc and puts it
* into result. If not found, returns false. Otherwise, returns true.
* The field is returned as a Text* Datum if as_text is true, or a Jsonb*
* Datum if as_text is false.
*/
static bool
ExtractFieldJsonb(Datum jsonbDoc, const char *fieldName, Datum *result, bool as_text)
{
Datum pathArray[1] = { CStringGetTextDatum(fieldName) };
bool pathNulls[1] = { false };
bool typeByValue = false;
char typeAlignment = 0;
int16 typeLength = 0;
int dimensions[1] = { 1 };
int lowerbounds[1] = { 1 };
get_typlenbyvalalign(TEXTOID, &typeLength, &typeByValue, &typeAlignment);
ArrayType *pathArrayObject = construct_md_array(pathArray, pathNulls, 1, dimensions,
lowerbounds, TEXTOID, typeLength,
typeByValue, typeAlignment);
Datum pathDatum = PointerGetDatum(pathArrayObject);
FmgrInfo fmgrInfo;
if (as_text)
{
fmgr_info(JsonbExtractPathTextFuncId(), &fmgrInfo);
}
else
{
fmgr_info(JsonbExtractPathFuncId(), &fmgrInfo);
}
LOCAL_FCINFO(functionCallInfo, 2);
InitFunctionCallInfoData(*functionCallInfo, &fmgrInfo, 2, DEFAULT_COLLATION_OID, NULL,
NULL);
fcSetArg(functionCallInfo, 0, jsonbDoc);
fcSetArg(functionCallInfo, 1, pathDatum);
*result = FunctionCallInvoke(functionCallInfo);
return !functionCallInfo->isnull;
}
/*
* ExtractFieldBoolean gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
*/
bool
ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false);
if (!found)
{
return defaultValue;
}
Datum boolDatum = DirectFunctionCall1(jsonb_bool, jsonbDatum);
return DatumGetBool(boolDatum);
}
/*
* ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or
* returns NULL if it doesn't exist.
*/
text *
ExtractFieldTextP(Datum jsonbDoc, const char *fieldName)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, true);
if (!found)
{
return NULL;
}
return DatumGetTextP(jsonbDatum);
}
/*
* ExtractFieldJsonbDatum gets value of fieldName from jsonbDoc and puts it
* into result. If not found, returns false. Otherwise, returns true.
*/
bool
ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result)
{
return ExtractFieldJsonb(jsonbDoc, fieldName, result, false);
}

View File

@ -0,0 +1,20 @@
/*-------------------------------------------------------------------------
*
* jsonbutils.h
*
* Declarations for public utility functions related to jsonb.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_JSONBUTILS_H
#define CITUS_JSONBUTILS_H
#include "postgres.h"
bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result);
text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName);
bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue);
#endif /* CITUS_JSONBUTILS_H */

View File

@ -256,6 +256,7 @@ extern Oid PgTableVisibleFuncId(void);
extern Oid CitusTableVisibleFuncId(void);
extern Oid RelationIsAKnownShardFuncId(void);
extern Oid JsonbExtractPathFuncId(void);
extern Oid JsonbExtractPathTextFuncId(void);
/* enum oids */
extern Oid PrimaryNodeRoleId(void);

View File

@ -70,4 +70,6 @@ extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort);
/* helper functions for worker transactions */
extern bool IsWorkerTransactionActive(void);
extern bool IsWorkerTheCurrentNode(WorkerNode *workerNode);
#endif /* WORKER_TRANSACTION_H */

View File

@ -1,6 +1,10 @@
--
-- ADD_COORDINATOR
--
-- node trying to add itself without specifying groupid => 0 should error out
SELECT master_add_node('localhost', :master_port);
ERROR: Node cannot add itself as a worker.
HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('localhost', 57636);
SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
-- adding the same node again should return the existing nodeid

View File

@ -2,6 +2,9 @@
-- ADD_COORDINATOR
--
-- node trying to add itself without specifying groupid => 0 should error out
SELECT master_add_node('localhost', :master_port);
SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
-- adding the same node again should return the existing nodeid