mirror of https://github.com/citusdata/citus.git
Merge pull request #675 from citusdata/fix_purge_segfault
Quick fix for possible segfault in PurgeConnection cr: @anarazelpull/685/head
commit
cd3b49e70b
|
@ -13,7 +13,6 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "c.h"
|
#include "c.h"
|
||||||
#include "fmgr.h"
|
#include "fmgr.h"
|
||||||
|
|
||||||
#include "libpq-fe.h"
|
#include "libpq-fe.h"
|
||||||
|
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
|
@ -22,7 +21,9 @@
|
||||||
|
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/connection_cache.h"
|
#include "distributed/connection_cache.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||||
|
#include "utils/elog.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,6 +35,7 @@ static Datum ExtractIntegerDatum(char *input);
|
||||||
PG_FUNCTION_INFO_V1(initialize_remote_temp_table);
|
PG_FUNCTION_INFO_V1(initialize_remote_temp_table);
|
||||||
PG_FUNCTION_INFO_V1(count_remote_temp_table_rows);
|
PG_FUNCTION_INFO_V1(count_remote_temp_table_rows);
|
||||||
PG_FUNCTION_INFO_V1(get_and_purge_connection);
|
PG_FUNCTION_INFO_V1(get_and_purge_connection);
|
||||||
|
PG_FUNCTION_INFO_V1(connect_and_purge_connection);
|
||||||
PG_FUNCTION_INFO_V1(set_connection_status_bad);
|
PG_FUNCTION_INFO_V1(set_connection_status_bad);
|
||||||
|
|
||||||
|
|
||||||
|
@ -128,6 +130,30 @@ get_and_purge_connection(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* get_and_purge_connection first gets a connection using the provided hostname
|
||||||
|
* and port before immediately passing that connection to PurgeConnection. This
|
||||||
|
* is to test PurgeConnection behvaior when circumventing the cache.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
connect_and_purge_connection(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
char *nodeName = PG_GETARG_CSTRING(0);
|
||||||
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
char *nodeUser = CurrentUserName();
|
||||||
|
|
||||||
|
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
|
||||||
|
if (connection == NULL)
|
||||||
|
{
|
||||||
|
PG_RETURN_BOOL(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
PurgeConnection(connection);
|
||||||
|
|
||||||
|
PG_RETURN_BOOL(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* set_connection_status_bad does not remove the given connection from the connection hash.
|
* set_connection_status_bad does not remove the given connection from the connection hash.
|
||||||
* It simply shuts down the underlying socket. On success, it returns true.
|
* It simply shuts down the underlying socket. On success, it returns true.
|
||||||
|
|
|
@ -14,16 +14,14 @@
|
||||||
#include "libpq-fe.h"
|
#include "libpq-fe.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/connection_cache.h"
|
#include "distributed/connection_cache.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "lib/stringinfo.h"
|
|
||||||
#include "mb/pg_wchar.h"
|
#include "mb/pg_wchar.h"
|
||||||
#include "nodes/pg_list.h"
|
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/elog.h"
|
#include "utils/elog.h"
|
||||||
#include "utils/errcodes.h"
|
#include "utils/errcodes.h"
|
||||||
|
@ -172,10 +170,6 @@ PurgeConnection(PGconn *connection)
|
||||||
*/
|
*/
|
||||||
if (purgedConnection != connection)
|
if (purgedConnection != connection)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different "
|
|
||||||
"connection than that provided by caller",
|
|
||||||
nodeConnectionKey.nodeName,
|
|
||||||
nodeConnectionKey.nodePort)));
|
|
||||||
PQfinish(connection);
|
PQfinish(connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,22 +180,21 @@ PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
|
||||||
{
|
{
|
||||||
bool entryFound = false;
|
bool entryFound = false;
|
||||||
NodeConnectionEntry *nodeConnectionEntry = NULL;
|
NodeConnectionEntry *nodeConnectionEntry = NULL;
|
||||||
|
PGconn *connection = NULL;
|
||||||
|
|
||||||
|
if (NodeConnectionHash != NULL)
|
||||||
|
{
|
||||||
|
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey,
|
||||||
|
HASH_REMOVE, &entryFound);
|
||||||
|
}
|
||||||
|
|
||||||
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, HASH_REMOVE,
|
|
||||||
&entryFound);
|
|
||||||
if (entryFound)
|
if (entryFound)
|
||||||
{
|
{
|
||||||
|
connection = nodeConnectionEntry->connection;
|
||||||
PQfinish(nodeConnectionEntry->connection);
|
PQfinish(nodeConnectionEntry->connection);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(WARNING, (errcode(ERRCODE_NO_DATA),
|
|
||||||
errmsg("could not find hash entry for connection to \"%s:%d\"",
|
|
||||||
nodeConnectionKey->nodeName,
|
|
||||||
nodeConnectionKey->nodePort)));
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodeConnectionEntry->connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ extern Datum fake_fdw_handler(PG_FUNCTION_ARGS);
|
||||||
extern Datum initialize_remote_temp_table(PG_FUNCTION_ARGS);
|
extern Datum initialize_remote_temp_table(PG_FUNCTION_ARGS);
|
||||||
extern Datum count_remote_temp_table_rows(PG_FUNCTION_ARGS);
|
extern Datum count_remote_temp_table_rows(PG_FUNCTION_ARGS);
|
||||||
extern Datum get_and_purge_connection(PG_FUNCTION_ARGS);
|
extern Datum get_and_purge_connection(PG_FUNCTION_ARGS);
|
||||||
|
extern Datum connect_and_purge_connection(PG_FUNCTION_ARGS);
|
||||||
extern Datum set_connection_status_bad(PG_FUNCTION_ARGS);
|
extern Datum set_connection_status_bad(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
/* function declarations for exercising metadata functions */
|
/* function declarations for exercising metadata functions */
|
||||||
|
|
|
@ -15,6 +15,10 @@ CREATE FUNCTION get_and_purge_connection(cstring, integer)
|
||||||
RETURNS bool
|
RETURNS bool
|
||||||
AS 'citus'
|
AS 'citus'
|
||||||
LANGUAGE C STRICT;
|
LANGUAGE C STRICT;
|
||||||
|
CREATE FUNCTION connect_and_purge_connection(cstring, integer)
|
||||||
|
RETURNS bool
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
CREATE FUNCTION set_connection_status_bad(cstring, integer)
|
CREATE FUNCTION set_connection_status_bad(cstring, integer)
|
||||||
RETURNS bool
|
RETURNS bool
|
||||||
AS 'citus'
|
AS 'citus'
|
||||||
|
@ -104,3 +108,11 @@ SELECT get_and_purge_connection('localhost', :worker_port);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
\c
|
||||||
|
-- purge existing connection to localhost
|
||||||
|
SELECT connect_and_purge_connection('localhost', :worker_port);
|
||||||
|
connect_and_purge_connection
|
||||||
|
------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,11 @@ CREATE FUNCTION get_and_purge_connection(cstring, integer)
|
||||||
AS 'citus'
|
AS 'citus'
|
||||||
LANGUAGE C STRICT;
|
LANGUAGE C STRICT;
|
||||||
|
|
||||||
|
CREATE FUNCTION connect_and_purge_connection(cstring, integer)
|
||||||
|
RETURNS bool
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
|
|
||||||
CREATE FUNCTION set_connection_status_bad(cstring, integer)
|
CREATE FUNCTION set_connection_status_bad(cstring, integer)
|
||||||
RETURNS bool
|
RETURNS bool
|
||||||
AS 'citus'
|
AS 'citus'
|
||||||
|
@ -76,3 +81,8 @@ SELECT count_remote_temp_table_rows('localhost', :worker_port);
|
||||||
SELECT get_and_purge_connection('localhost', :worker_port);
|
SELECT get_and_purge_connection('localhost', :worker_port);
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
||||||
|
\c
|
||||||
|
|
||||||
|
-- purge existing connection to localhost
|
||||||
|
SELECT connect_and_purge_connection('localhost', :worker_port);
|
||||||
|
|
Loading…
Reference in New Issue