diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 6cce0688b..8f5ebae97 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -169,6 +169,7 @@ typedef struct MetadataCacheData Oid citusTableIsVisibleFuncId; Oid relationIsAKnownShardFuncId; Oid jsonbExtractPathFuncId; + Oid jsonbExtractPathTextFuncId; bool databaseNameValid; char databaseName[NAMEDATALEN]; } MetadataCacheData; @@ -2726,6 +2727,24 @@ JsonbExtractPathFuncId(void) } +/* + * JsonbExtractPathTextFuncId returns oid of the jsonb_extract_path_text function. + */ +Oid +JsonbExtractPathTextFuncId(void) +{ + if (MetadataCache.jsonbExtractPathTextFuncId == InvalidOid) + { + const int argCount = 2; + + MetadataCache.jsonbExtractPathTextFuncId = + FunctionOid("pg_catalog", "jsonb_extract_path_text", argCount); + } + + return MetadataCache.jsonbExtractPathTextFuncId; +} + + /* * CurrentDatabaseName gets the name of the current database and caches * the result. diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index f9f070166..6f80558e8 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -275,6 +275,23 @@ citus_add_node(PG_FUNCTION_ARGS) */ if (!nodeAlreadyExists) { + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort); + + /* + * If the worker is not marked as a coordinator, check that + * the node is not trying to add itself + */ + if (workerNode != NULL && + workerNode->groupId != COORDINATOR_GROUP_ID && + IsWorkerTheCurrentNode(workerNode)) + { + ereport(ERROR, (errmsg("Node cannot add itself as a worker."), + errhint( + "Add the node as a coordinator by using: " + "SELECT citus_set_coordinator_host('%s', %d);", + nodeNameString, nodePort))); + } + ActivateNode(nodeNameString, nodePort); } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index bca9bbaa1..2aba411eb 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -46,9 +46,9 @@ #include "distributed/placement_connection.h" #include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" -#include "distributed/listutils.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" +#include "distributed/jsonbutils.h" #include "executor/tstoreReceiver.h" #include "fmgr.h" #include "lib/stringinfo.h" @@ -143,10 +143,8 @@ static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, QueryEnvironment *queryEnv, const instr_time *planduration, double *executionDurationMillisec); -static bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue); static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue); -static bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); static TupleDestination * CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest); static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, @@ -1112,25 +1110,6 @@ FreeSavedExplainPlan(void) } -/* - * ExtractFieldBoolean gets value of fieldName from jsonbDoc, or returns - * defaultValue if it doesn't exist. - */ -static bool -ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue) -{ - Datum jsonbDatum = 0; - bool found = ExtractFieldJsonbDatum(jsonbDoc, fieldName, &jsonbDatum); - if (!found) - { - return defaultValue; - } - - Datum boolDatum = DirectFunctionCall1(jsonb_bool, jsonbDatum); - return DatumGetBool(boolDatum); -} - - /* * ExtractFieldExplainFormat gets value of fieldName from jsonbDoc, or returns * defaultValue if it doesn't exist. @@ -1169,50 +1148,6 @@ ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat } -/* - * ExtractFieldJsonbDatum gets value of fieldName from jsonbDoc and puts it - * into result. If not found, returns false. Otherwise, returns true. - */ -static bool -ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result) -{ - Datum pathArray[1] = { CStringGetTextDatum(fieldName) }; - bool pathNulls[1] = { false }; - bool typeByValue = false; - char typeAlignment = 0; - int16 typeLength = 0; - int dimensions[1] = { 1 }; - int lowerbounds[1] = { 1 }; - - get_typlenbyvalalign(TEXTOID, &typeLength, &typeByValue, &typeAlignment); - - ArrayType *pathArrayObject = construct_md_array(pathArray, pathNulls, 1, dimensions, - lowerbounds, TEXTOID, typeLength, - typeByValue, typeAlignment); - Datum pathDatum = PointerGetDatum(pathArrayObject); - - /* - * We need to check whether the result of jsonb_extract_path is NULL or not, so use - * FunctionCallInvoke() instead of other function call api. - * - * We cannot use jsonb_path_exists to ensure not-null since it is not available in - * postgres 11. - */ - FmgrInfo fmgrInfo; - fmgr_info(JsonbExtractPathFuncId(), &fmgrInfo); - - LOCAL_FCINFO(functionCallInfo, 2); - InitFunctionCallInfoData(*functionCallInfo, &fmgrInfo, 2, DEFAULT_COLLATION_OID, NULL, - NULL); - - fcSetArg(functionCallInfo, 0, jsonbDoc); - fcSetArg(functionCallInfo, 1, pathDatum); - - *result = FunctionCallInvoke(functionCallInfo); - return !functionCallInfo->isnull; -} - - /* * CitusExplainOneQuery is the executor hook that is called when * postgres wants to explain a query. diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index e94abed53..2878cae86 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -30,8 +30,9 @@ #include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/jsonbutils.h" #include "utils/memutils.h" - +#include "utils/builtins.h" static void SendCommandToMetadataWorkersParams(const char *command, const char *user, int parameterCount, @@ -639,3 +640,58 @@ ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList) } } } + + +/* + * IsWorkerTheCurrentNode checks if the given worker refers to the + * the current node by comparing the server id of the worker and of the + * current nodefrom pg_dist_node_metadata + */ +bool +IsWorkerTheCurrentNode(WorkerNode *workerNode) +{ + int connectionFlags = REQUIRE_METADATA_CONNECTION; + + MultiConnection *workerConnection = + GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + CurrentUserName(), + NULL); + const char *command = + "SELECT metadata ->> 'server_id' AS server_id FROM pg_dist_node_metadata"; + + SendRemoteCommand(workerConnection, command); + + PGresult *result = GetRemoteCommandResult(workerConnection, true); + + if (result == NULL) + { + return false; + } + + List *commandResult = ReadFirstColumnAsText(result); + + PQclear(result); + ForgetResults(workerConnection); + + if ((list_length(commandResult) != 1)) + { + return false; + } + + StringInfo resultInfo = (StringInfo) linitial(commandResult); + char *workerServerId = resultInfo->data; + + Datum metadata = DistNodeMetadata(); + text *currentServerIdTextP = ExtractFieldTextP(metadata, "server_id"); + + if (currentServerIdTextP == NULL) + { + return false; + } + + char *currentServerId = text_to_cstring(currentServerIdTextP); + + return strcmp(workerServerId, currentServerId) == 0; +} diff --git a/src/backend/distributed/utils/jsonbutils.c b/src/backend/distributed/utils/jsonbutils.c new file mode 100644 index 000000000..22fa4f568 --- /dev/null +++ b/src/backend/distributed/utils/jsonbutils.c @@ -0,0 +1,113 @@ +#include "postgres.h" + +#include "pg_version_compat.h" + +#include "catalog/namespace.h" +#include "catalog/pg_class.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_type.h" + +#include "utils/array.h" +#include "utils/json.h" +#include "distributed/jsonbutils.h" +#include "distributed/metadata_cache.h" + +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "fmgr.h" + + +/* + * ExtractFieldJsonb gets value of fieldName from jsonbDoc and puts it + * into result. If not found, returns false. Otherwise, returns true. + * The field is returned as a Text* Datum if as_text is true, or a Jsonb* + * Datum if as_text is false. + */ +static bool +ExtractFieldJsonb(Datum jsonbDoc, const char *fieldName, Datum *result, bool as_text) +{ + Datum pathArray[1] = { CStringGetTextDatum(fieldName) }; + bool pathNulls[1] = { false }; + bool typeByValue = false; + char typeAlignment = 0; + int16 typeLength = 0; + int dimensions[1] = { 1 }; + int lowerbounds[1] = { 1 }; + + get_typlenbyvalalign(TEXTOID, &typeLength, &typeByValue, &typeAlignment); + + ArrayType *pathArrayObject = construct_md_array(pathArray, pathNulls, 1, dimensions, + lowerbounds, TEXTOID, typeLength, + typeByValue, typeAlignment); + Datum pathDatum = PointerGetDatum(pathArrayObject); + + FmgrInfo fmgrInfo; + + if (as_text) + { + fmgr_info(JsonbExtractPathTextFuncId(), &fmgrInfo); + } + else + { + fmgr_info(JsonbExtractPathFuncId(), &fmgrInfo); + } + + LOCAL_FCINFO(functionCallInfo, 2); + InitFunctionCallInfoData(*functionCallInfo, &fmgrInfo, 2, DEFAULT_COLLATION_OID, NULL, + NULL); + + fcSetArg(functionCallInfo, 0, jsonbDoc); + fcSetArg(functionCallInfo, 1, pathDatum); + + *result = FunctionCallInvoke(functionCallInfo); + return !functionCallInfo->isnull; +} + + +/* + * ExtractFieldBoolean gets value of fieldName from jsonbDoc, or returns + * defaultValue if it doesn't exist. + */ +bool +ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue) +{ + Datum jsonbDatum = 0; + bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false); + if (!found) + { + return defaultValue; + } + + Datum boolDatum = DirectFunctionCall1(jsonb_bool, jsonbDatum); + return DatumGetBool(boolDatum); +} + + +/* + * ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or + * returns NULL if it doesn't exist. + */ +text * +ExtractFieldTextP(Datum jsonbDoc, const char *fieldName) +{ + Datum jsonbDatum = 0; + + bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, true); + if (!found) + { + return NULL; + } + + return DatumGetTextP(jsonbDatum); +} + + +/* + * ExtractFieldJsonbDatum gets value of fieldName from jsonbDoc and puts it + * into result. If not found, returns false. Otherwise, returns true. + */ +bool +ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result) +{ + return ExtractFieldJsonb(jsonbDoc, fieldName, result, false); +} diff --git a/src/include/distributed/jsonbutils.h b/src/include/distributed/jsonbutils.h new file mode 100644 index 000000000..3e37fa38e --- /dev/null +++ b/src/include/distributed/jsonbutils.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * jsonbutils.h + * + * Declarations for public utility functions related to jsonb. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#ifndef CITUS_JSONBUTILS_H +#define CITUS_JSONBUTILS_H + +#include "postgres.h" + +bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); +text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName); +bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue); + +#endif /* CITUS_JSONBUTILS_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 46ba72a49..e190aef6f 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -256,6 +256,7 @@ extern Oid PgTableVisibleFuncId(void); extern Oid CitusTableVisibleFuncId(void); extern Oid RelationIsAKnownShardFuncId(void); extern Oid JsonbExtractPathFuncId(void); +extern Oid JsonbExtractPathTextFuncId(void); /* enum oids */ extern Oid PrimaryNodeRoleId(void); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index c3748ee5b..6cb7d8bce 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -70,4 +70,6 @@ extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort); /* helper functions for worker transactions */ extern bool IsWorkerTransactionActive(void); +extern bool IsWorkerTheCurrentNode(WorkerNode *workerNode); + #endif /* WORKER_TRANSACTION_H */ diff --git a/src/test/regress/expected/add_coordinator.out b/src/test/regress/expected/add_coordinator.out index 51808515b..01f3a682d 100644 --- a/src/test/regress/expected/add_coordinator.out +++ b/src/test/regress/expected/add_coordinator.out @@ -1,6 +1,10 @@ -- -- ADD_COORDINATOR -- +-- node trying to add itself without specifying groupid => 0 should error out +SELECT master_add_node('localhost', :master_port); +ERROR: Node cannot add itself as a worker. +HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('localhost', 57636); SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata -- adding the same node again should return the existing nodeid diff --git a/src/test/regress/sql/add_coordinator.sql b/src/test/regress/sql/add_coordinator.sql index eb5e37778..2dba78064 100644 --- a/src/test/regress/sql/add_coordinator.sql +++ b/src/test/regress/sql/add_coordinator.sql @@ -2,6 +2,9 @@ -- ADD_COORDINATOR -- +-- node trying to add itself without specifying groupid => 0 should error out +SELECT master_add_node('localhost', :master_port); + SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset -- adding the same node again should return the existing nodeid