Fix isolation tester

add_worker_query_again
Onder Kalaci 2022-02-16 18:28:40 +01:00
parent 0d9bf7ad22
commit 8b04e37b73
2 changed files with 44 additions and 12 deletions

View File

@ -235,15 +235,10 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
char nodePortString[12] = ""; char nodePortString[12] = "";
StringInfo applicationName = makeStringInfo(); StringInfo applicationName = makeStringInfo();
if (AllowNonIdleTransactionOnXactHandling())
{ appendStringInfo(applicationName, "%s%ld", CITUS_APPLICATION_NAME_PREFIX,
appendStringInfoString(applicationName, "citus isolation test"); GetGlobalPID());
}
else
{
appendStringInfo(applicationName, "%s%ld", CITUS_APPLICATION_NAME_PREFIX,
GetGlobalPID());
}
/* /*
* This function has three sections: * This function has three sections:

View File

@ -71,6 +71,9 @@ AllowNonIdleTransactionOnXactHandling(void)
} }
#include "mb/pg_wchar.h"
/* /*
* start_session_level_connection_to_node helps us to open and keep connections * start_session_level_connection_to_node helps us to open and keep connections
* open while sending consecutive commands, even if they are outside the transaction. * open while sending consecutive commands, even if they are outside the transaction.
@ -88,7 +91,6 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
uint32 nodePort = PG_GETARG_UINT32(1); uint32 nodePort = PG_GETARG_UINT32(1);
char *nodeNameString = text_to_cstring(nodeName); char *nodeNameString = text_to_cstring(nodeName);
int connectionFlags = 0;
if (singleConnection != NULL && (strcmp(singleConnection->hostname, if (singleConnection != NULL && (strcmp(singleConnection->hostname,
nodeNameString) != 0 || nodeNameString) != 0 ||
@ -106,7 +108,42 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
{ {
allowNonIdleRemoteTransactionOnXactHandling = true; allowNonIdleRemoteTransactionOnXactHandling = true;
singleConnection = GetNodeConnection(connectionFlags, nodeNameString, nodePort); char **keywords =
MemoryContextAllocZero(CurrentMemoryContext, 10 *
sizeof(char *));
char **values =
MemoryContextAllocZero(CurrentMemoryContext, 10 *
sizeof(char *));
keywords[0] = "host";
values[0] = nodeNameString;
keywords[1] = "port";
char *portSTR = (char *) palloc(10);
pg_ultoa_n(nodePort, portSTR);
values[1] = portSTR;
keywords[2] = "dbname";
values[2] = (char *) CurrentDatabaseName();
keywords[3] = "user";
values[3] = CurrentUserName();
keywords[4] = "client_encoding";
values[4] = (char *) GetDatabaseEncodingName();
keywords[5] = "application_name";
values[5] = "citus isolation tester";
keywords[6] = values[6] = NULL;
singleConnection = palloc(sizeof(MultiConnection));
singleConnection->pgConn = PQconnectdbParams((const char **) keywords,
(const char **) values,
false);
} }
if (PQstatus(singleConnection->pgConn) != CONNECTION_OK) if (PQstatus(singleConnection->pgConn) != CONNECTION_OK)
@ -187,7 +224,7 @@ stop_session_level_connection_to_node(PG_FUNCTION_ARGS)
if (singleConnection != NULL) if (singleConnection != NULL)
{ {
CloseConnection(singleConnection); PQfinish(singleConnection->pgConn);
singleConnection = NULL; singleConnection = NULL;
} }