Fix isolation tester

add_worker_query_again
Onder Kalaci 2022-02-16 20:39:13 +01:00
parent 75e19087b4
commit d5484d4e69
1 changed files with 67 additions and 43 deletions

View File

@ -41,6 +41,7 @@
static bool allowNonIdleRemoteTransactionOnXactHandling = false;
static MemoryContext LocalConnectionContext = NULL;
static MultiConnection *singleConnection = NULL;
@ -52,6 +53,7 @@ int IsolationTestSessionRemoteProcessID = -1;
int IsolationTestSessionProcessID = -1;
static void EstablishSingleConnnection(char *nodeNameString, int nodePort);
static int64 GetRemoteProcessId(void);
/* declarations for dynamic loading */
@ -108,49 +110,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
{
allowNonIdleRemoteTransactionOnXactHandling = true;
char **keywords =
MemoryContextAllocZero(ConnectionContext, 10 *
sizeof(char *));
char **values =
MemoryContextAllocZero(ConnectionContext, 10 *
sizeof(char *));
keywords[0] = "host";
values[0] = MemoryContextStrdup(ConnectionContext, nodeNameString);
keywords[1] = "port";
StringInfo str = makeStringInfo();
appendStringInfo(str, "%d", nodePort);
values[1] = str->data;
keywords[2] = "dbname";
values[2] = MemoryContextStrdup(ConnectionContext,
(char *) CurrentDatabaseName());
keywords[3] = "user";
values[3] = MemoryContextStrdup(ConnectionContext, CurrentUserName());
keywords[4] = "client_encoding";
values[4] = MemoryContextStrdup(ConnectionContext, pstrdup(
(char *) GetDatabaseEncodingName()));
keywords[5] = "application_name";
values[5] = "citus isolation tester";
keywords[6] = values[6] = NULL;
singleConnection = palloc(sizeof(MultiConnection));
strlcpy(singleConnection->hostname, nodeNameString, MAX_NODE_LENGTH);
singleConnection->port = nodePort;
singleConnection->pgConn = PQconnectdbParams((const char **) keywords,
(const char **) values,
false);
EstablishSingleConnnection(nodeNameString, nodePort);
}
if (PQstatus(singleConnection->pgConn) != CONNECTION_OK)
@ -168,6 +128,70 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
}
static void
EstablishSingleConnnection(char *nodeNameString, int nodePort)
{
if (LocalConnectionContext != NULL)
{
MemoryContextDelete(LocalConnectionContext);
LocalConnectionContext = NULL;
}
LocalConnectionContext =
AllocSetContextCreateExtended(ConnectionContext,
"Isolation Test Connection Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
char **keywords =
MemoryContextAllocZero(LocalConnectionContext, 6 *
sizeof(char *));
char **values =
MemoryContextAllocZero(LocalConnectionContext, 6 *
sizeof(char *));
keywords[0] = "host";
values[0] = MemoryContextStrdup(LocalConnectionContext, nodeNameString);
keywords[1] = "port";
StringInfo str = makeStringInfo();
appendStringInfo(str, "%d", nodePort);
values[1] = MemoryContextStrdup(LocalConnectionContext, str->data);
keywords[2] = "dbname";
values[2] = MemoryContextStrdup(LocalConnectionContext,
(char *) CurrentDatabaseName());
keywords[3] = "user";
values[3] = MemoryContextStrdup(LocalConnectionContext, CurrentUserName());
keywords[4] = "client_encoding";
values[4] = MemoryContextStrdup(LocalConnectionContext, pstrdup(
(char *) GetDatabaseEncodingName()));
keywords[5] = "application_name";
values[5] = "citus isolation tester";
/* libpq expects this */
keywords[6] = values[6] = NULL;
singleConnection = MemoryContextAlloc(LocalConnectionContext,
sizeof(MultiConnection));
strlcpy(singleConnection->hostname, nodeNameString, MAX_NODE_LENGTH);
singleConnection->port = nodePort;
singleConnection->pgConn = PQconnectdbParams((const char **) keywords,
(const char **) values,
false);
}
/*
* run_commands_on_session_level_connection_to_node runs to consecutive commands
* from the same connection opened by start_session_level_connection_to_node.