Remove citus.worker_list_file & master_initialize_node_metadata

pull/3191/head
Philip Dubé 2019-11-07 22:58:24 +00:00 committed by Philip Dubé
parent 48552bfffe
commit eb35743c3f
8 changed files with 20 additions and 284 deletions

View File

@ -81,7 +81,6 @@ static void RegisterCitusConfigVariables(void);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
GucSource source);
static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source);
static void NormalizeWorkerListPath(void);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
@ -380,18 +379,6 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS | GUC_STANDARD,
NULL, NULL, NULL);
/* keeping temporarily for updates from pre-6.0 versions */
DefineCustomStringVariable(
"citus.worker_list_file",
gettext_noop("Sets the server's \"worker_list\" configuration file."),
NULL,
&WorkerListFileName,
NULL,
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
NormalizeWorkerListPath();
DefineCustomIntVariable(
"citus.sslmode",
gettext_noop("This variable has been deprecated. Use the citus.node_conninfo "
@ -1276,9 +1263,6 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
NormalizeWorkerListPath();
/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");
}
@ -1326,51 +1310,6 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source)
}
/*
* NormalizeWorkerListPath converts the path configured via
* citus.worker_list_file into an absolute path, falling back to the default
* value if necessary. The previous value of the config variable is
* overwritten with the normalized value.
*
* NB: This has to be called before ChangeToDataDir() is called as otherwise
* the relative paths won't make much sense to the user anymore.
*/
static void
NormalizeWorkerListPath(void)
{
char *absoluteFileName = NULL;
if (WorkerListFileName != NULL)
{
absoluteFileName = make_absolute_path(WorkerListFileName);
}
else if (DataDir != NULL)
{
absoluteFileName = malloc(strlen(DataDir) + strlen(WORKER_LIST_FILENAME) + 2);
if (absoluteFileName == NULL)
{
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
sprintf(absoluteFileName, "%s/%s", DataDir, WORKER_LIST_FILENAME);
}
else
{
ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s does not know where to find the \"worker_list_file\" "
"configuration file.\n"
"This can be specified as \"citus.worker_list_file\" in "
"\"%s\", or by the -D invocation option, or by the PGDATA "
"environment variable.\n", progname, ConfigFileName)));
}
SetConfigOption("citus.worker_list_file", absoluteFileName, PGC_POSTMASTER,
PGC_S_OVERRIDE);
free(absoluteFileName);
}
/*
* NodeConninfoGucCheckHook ensures conninfo settings are in the expected form
* and that the keywords of all non-null settings are on a whitelist devised to

View File

@ -1,4 +1,4 @@
-- citus--7.0-1.sql
-- citus--7.0-1.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION citus" to load this file. \quit
@ -44,7 +44,7 @@ CREATE TABLE citus.pg_dist_partition(
colocationid integer DEFAULT 0 NOT NULL,
repmodel "char" DEFAULT 'c' NOT NULL
);
-- SELECT granted to PUBLIC in upgrade script
-- SELECT granted to PUBLIC in upgrade script
CREATE UNIQUE INDEX pg_dist_partition_logical_relid_index
ON citus.pg_dist_partition using btree(logicalrelid);
ALTER TABLE citus.pg_dist_partition SET SCHEMA pg_catalog;
@ -62,7 +62,7 @@ CREATE TABLE citus.pg_dist_shard(
-- ALTER-after-CREATE to keep table tuple layout consistent
-- with earlier versions of Citus.
ALTER TABLE citus.pg_dist_shard DROP shardalias;
-- SELECT granted to PUBLIC in upgrade script
-- SELECT granted to PUBLIC in upgrade script
CREATE UNIQUE INDEX pg_dist_shard_shardid_index
ON citus.pg_dist_shard using btree(shardid);
CREATE INDEX pg_dist_shard_logical_relid_index
@ -82,7 +82,7 @@ CREATE TABLE citus.pg_dist_shard_placement(
nodeport int8 NOT NULL,
placementid bigint NOT NULL DEFAULT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq')
);
-- SELECT granted to PUBLIC in upgrade script
-- SELECT granted to PUBLIC in upgrade script
CREATE UNIQUE INDEX pg_dist_shard_placement_placementid_index
ON citus.pg_dist_shard_placement using btree(placementid);
CREATE INDEX pg_dist_shard_placement_shardid_index
@ -105,17 +105,17 @@ ALTER SEQUENCE citus.pg_dist_shardid_seq SET SCHEMA pg_catalog;
-- used to identify jobs in the distributed database; and they wrap at 32-bits
-- to allow for worker nodes to independently execute their distributed jobs.
CREATE SEQUENCE citus.pg_dist_jobid_seq
MINVALUE 2 -- first jobId reserved for clean up jobs
MINVALUE 2 -- first jobId reserved for clean up jobs
MAXVALUE 4294967296;
ALTER SEQUENCE citus.pg_dist_jobid_seq SET SCHEMA pg_catalog;
-- Citus functions
-- For backward compatibility and ease of use create functions et al. in pg_catalog
-- For backward compatibility and ease of use create functions et al. in pg_catalog
SET search_path = 'pg_catalog';
-- master_* functions
-- master_* functions
CREATE FUNCTION master_get_table_metadata(relation_name text, OUT logical_relid oid,
OUT part_storage_type "char",
@ -198,7 +198,7 @@ RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
-- task_tracker_* functions
-- task_tracker_* functions
CREATE FUNCTION task_tracker_assign_task(bigint, integer, text)
RETURNS void
@ -222,7 +222,7 @@ COMMENT ON FUNCTION task_tracker_cleanup_job(bigint)
IS 'clean up all tasks associated with a job';
-- worker_* functions
-- worker_* functions
CREATE FUNCTION worker_fetch_partition_file(bigint, integer, integer, integer, text,
integer)
@ -284,7 +284,7 @@ CREATE FUNCTION master_drop_sequences(sequence_names text[])
COMMENT ON FUNCTION master_drop_sequences(text[])
IS 'drop specified sequences from the cluster';
-- trigger functions
-- trigger functions
CREATE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
@ -347,7 +347,7 @@ COMMENT ON FUNCTION master_dist_shard_cache_invalidate()
IS 'register relcache invalidation for changed rows';
-- internal functions, not user accessible
-- internal functions, not user accessible
CREATE FUNCTION citus_extradata_container(INTERNAL)
RETURNS void
@ -385,7 +385,7 @@ GRANT SELECT ON pg_catalog.pg_dist_partition TO public;
GRANT SELECT ON pg_catalog.pg_dist_shard TO public;
GRANT SELECT ON pg_catalog.pg_dist_shard_placement TO public;
-- empty, but required to update the extension version
-- empty, but required to update the extension version
CREATE FUNCTION pg_catalog.master_modify_multiple_shards(text)
RETURNS integer
LANGUAGE C STRICT
@ -462,7 +462,7 @@ CREATE SEQUENCE citus.pg_dist_node_nodeid_seq
ALTER SEQUENCE citus.pg_dist_groupid_seq SET SCHEMA pg_catalog;
ALTER SEQUENCE citus.pg_dist_node_nodeid_seq SET SCHEMA pg_catalog;
-- add pg_dist_node
-- add pg_dist_node
CREATE TABLE citus.pg_dist_node(
nodeid int NOT NULL DEFAULT nextval('pg_dist_groupid_seq') PRIMARY KEY,
groupid int NOT NULL DEFAULT nextval('pg_dist_node_nodeid_seq'),
@ -496,14 +496,6 @@ CREATE FUNCTION master_remove_node(nodename text, nodeport integer)
COMMENT ON FUNCTION master_remove_node(nodename text, nodeport integer)
IS 'remove node from the cluster';
-- this only needs to run once, now.
CREATE FUNCTION master_initialize_node_metadata()
RETURNS BOOL
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_initialize_node_metadata$$;
SELECT master_initialize_node_metadata();
RESET search_path;
CREATE FUNCTION pg_catalog.master_get_new_placementid()
@ -552,7 +544,7 @@ CREATE TABLE citus.pg_dist_local_group(
groupid int NOT NULL PRIMARY KEY)
;
-- insert the default value for being the coordinator node
-- insert the default value for being the coordinator node
INSERT INTO citus.pg_dist_local_group VALUES (0);
ALTER TABLE citus.pg_dist_local_group SET SCHEMA pg_catalog;
@ -588,7 +580,7 @@ CREATE SEQUENCE citus.pg_dist_colocationid_seq
ALTER SEQUENCE citus.pg_dist_colocationid_seq SET SCHEMA pg_catalog;
-- add pg_dist_colocation
-- add pg_dist_colocation
CREATE TABLE citus.pg_dist_colocation(
colocationid int NOT NULL PRIMARY KEY,
shardcount int NOT NULL,

View File

@ -1,4 +1,4 @@
-- citus--8.0-8--8.0-9
-- citus--8.0-8--8.0-9
SET search_path = 'pg_catalog';
REVOKE ALL ON FUNCTION master_activate_node(text,int) FROM PUBLIC;
@ -6,7 +6,6 @@ REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM
REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC;
REVOKE ALL ON FUNCTION master_add_secondary_node(text,int,text,int,name) FROM PUBLIC;
REVOKE ALL ON FUNCTION master_disable_node(text,int) FROM PUBLIC;
REVOKE ALL ON FUNCTION master_initialize_node_metadata() FROM PUBLIC;
REVOKE ALL ON FUNCTION master_remove_node(text,int) FROM PUBLIC;
REVOKE ALL ON FUNCTION master_update_node(int,text,int) FROM PUBLIC;

View File

@ -10,3 +10,7 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS
UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntype = 0;
#include "udfs/any_value/9.1-1.sql"
-- drop function which was used for upgrading from 6.0
-- creation was removed from citus--7.0-1.sql
DROP FUNCTION IF EXISTS pg_catalog.master_initialize_node_metadata;

View File

@ -81,7 +81,6 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
@ -97,7 +96,6 @@ PG_FUNCTION_INFO_V1(master_remove_node);
PG_FUNCTION_INFO_V1(master_disable_node);
PG_FUNCTION_INFO_V1(master_activate_node);
PG_FUNCTION_INFO_V1(master_update_node);
PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
@ -751,43 +749,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
}
/*
* master_initialize_node_metadata is run once, when upgrading citus. It ingests the
* existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating
* that it's no longer used.
*/
Datum
master_initialize_node_metadata(PG_FUNCTION_ARGS)
{
List *workerNodes = NIL;
WorkerNode *workerNode = NULL;
CheckCitusVersion(ERROR);
/*
* This function should only ever be called from the create extension
* script, but just to be sure, take an exclusive lock on pg_dist_node
* to prevent concurrent calls.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNodes = ParseWorkerNodeFileAndRename();
foreach_ptr(workerNode, workerNodes)
{
bool nodeAlreadyExists = false;
NodeMetadata nodeMetadata = DefaultNodeMetadata();
nodeMetadata.nodeRack = workerNode->workerRack;
nodeMetadata.isActive = workerNode->isActive;
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, &nodeMetadata,
&nodeAlreadyExists);
}
PG_RETURN_BOOL(true);
}
/*
* get_shard_id_for_distribution_column function takes a distributed table name and a
* distribution value then returns shard id of the shard which belongs to given table and
@ -1472,159 +1433,6 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
}
/*
* ParseWorkerNodeFileAndRename opens and parses the node name and node port from the
* specified configuration file and after that, renames it marking it is not used anymore.
* Note that this function is deprecated. Do not use this function for any new
* features.
*/
static List *
ParseWorkerNodeFileAndRename()
{
FILE *workerFileStream = NULL;
List *workerNodeList = NIL;
char workerNodeLine[MAXPGPATH];
char *workerFilePath = make_absolute_path(WorkerListFileName);
StringInfo renamedWorkerFilePath = makeStringInfo();
char *workerPatternTemplate = "%%%u[^# \t]%%*[ \t]%%%u[^# \t]%%*[ \t]%%%u[^# \t]";
char workerLinePattern[1024];
const int workerNameIndex = 0;
const int workerPortIndex = 1;
memset(workerLinePattern, '\0', sizeof(workerLinePattern));
workerFileStream = AllocateFile(workerFilePath, PG_BINARY_R);
if (workerFileStream == NULL)
{
if (errno == ENOENT)
{
ereport(DEBUG1, (errmsg("worker list file located at \"%s\" is not present",
workerFilePath)));
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open worker list file \"%s\": %m",
workerFilePath)));
}
return NIL;
}
/* build pattern to contain node name length limit */
snprintf(workerLinePattern, sizeof(workerLinePattern), workerPatternTemplate,
WORKER_LENGTH, MAX_PORT_LENGTH, WORKER_LENGTH);
while (fgets(workerNodeLine, sizeof(workerNodeLine), workerFileStream) != NULL)
{
const int workerLineLength = strnlen(workerNodeLine, MAXPGPATH);
WorkerNode *workerNode = NULL;
char *linePointer = NULL;
int32 nodePort = 5432; /* default port number */
int fieldCount = 0;
bool lineIsInvalid = false;
char nodeName[WORKER_LENGTH + 1];
char nodeRack[WORKER_LENGTH + 1];
char nodePortString[MAX_PORT_LENGTH + 1];
memset(nodeName, '\0', sizeof(nodeName));
strlcpy(nodeRack, WORKER_DEFAULT_RACK, sizeof(nodeRack));
memset(nodePortString, '\0', sizeof(nodePortString));
if (workerLineLength == MAXPGPATH - 1)
{
ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("worker node list file line exceeds the maximum "
"length of %d", MAXPGPATH)));
}
/* trim trailing newlines preserved by fgets, if any */
linePointer = workerNodeLine + workerLineLength - 1;
while (linePointer >= workerNodeLine &&
(*linePointer == '\n' || *linePointer == '\r'))
{
*linePointer-- = '\0';
}
/* skip leading whitespace */
for (linePointer = workerNodeLine; *linePointer; linePointer++)
{
if (!isspace((unsigned char) *linePointer))
{
break;
}
}
/* if the entire line is whitespace or a comment, skip it */
if (*linePointer == '\0' || *linePointer == '#')
{
continue;
}
/* parse line; node name is required, but port and rack are optional */
fieldCount = sscanf(linePointer, workerLinePattern,
nodeName, nodePortString, nodeRack);
/* adjust field count for zero based indexes */
fieldCount--;
/* raise error if no fields were assigned */
if (fieldCount < workerNameIndex)
{
lineIsInvalid = true;
}
/* no special treatment for nodeName: already parsed by sscanf */
/* if a second token was specified, convert to integer port */
if (fieldCount >= workerPortIndex)
{
char *nodePortEnd = NULL;
errno = 0;
nodePort = strtol(nodePortString, &nodePortEnd, 10);
if (errno != 0 || (*nodePortEnd) != '\0' || nodePort <= 0)
{
lineIsInvalid = true;
}
}
if (lineIsInvalid)
{
ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR),
errmsg("could not parse worker node line: %s",
workerNodeLine),
errhint("Lines in the worker node file must contain a valid "
"node name and, optionally, a positive port number. "
"Comments begin with a '#' character and extend to "
"the end of their line.")));
}
/* allocate worker node structure and set fields */
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNode->hasMetadata = false;
workerNode->metadataSynced = false;
workerNode->isActive = true;
workerNodeList = lappend(workerNodeList, workerNode);
}
/* rename the file, marking that it is not used anymore */
appendStringInfo(renamedWorkerFilePath, "%s", workerFilePath);
appendStringInfo(renamedWorkerFilePath, ".obsolete");
rename(workerFilePath, renamedWorkerFilePath->data);
FreeFile(workerFileStream);
free(workerFilePath);
return workerNodeList;
}
/*
* TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
* converts this tuple to an equivalent struct in memory. The function assumes

View File

@ -27,9 +27,6 @@
/* Maximum length of worker port number (represented as string) */
#define MAX_PORT_LENGTH 10
/* default filename for citus.worker_list_file */
#define WORKER_LIST_FILENAME "pg_worker_list.conf"
/* Implementation specific definitions used in finding worker nodes */
#define WORKER_RACK_TRIES 5
#define WORKER_DEFAULT_RACK "default"

View File

@ -151,8 +151,6 @@ GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user;
-- try to manipulate node metadata via non-super user
SET ROLE non_super_user;
SELECT 1 FROM master_initialize_node_metadata();
ERROR: permission denied for function master_initialize_node_metadata
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port + 1);
ERROR: permission denied for function master_add_inactive_node
SELECT 1 FROM master_activate_node('localhost', :worker_2_port + 1);

View File

@ -70,7 +70,6 @@ GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_meta
-- try to manipulate node metadata via non-super user
SET ROLE non_super_user;
SELECT 1 FROM master_initialize_node_metadata();
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port + 1);
SELECT 1 FROM master_activate_node('localhost', :worker_2_port + 1);
SELECT 1 FROM master_disable_node('localhost', :worker_2_port + 1);