diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 88d465337..4fe31a7dc 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -315,6 +315,53 @@ MultiClientConnectionUp(int32 connectionId) } +/* MultiClientExecute synchronously executes a query over the given connection. */ +bool +MultiClientExecute(int32 connectionId, const char *query, void **queryResult, + int *rowCount, int *columnCount) +{ + bool querySent = false; + bool queryReady = false; + bool queryOK = false; + WaitInfo *waitInfo = NULL; + + querySent = MultiClientSendQuery(connectionId, query); + if (!querySent) + { + return false; + } + + waitInfo = MultiClientCreateWaitInfo(1); + + while (!queryReady) + { + ResultStatus resultStatus = MultiClientResultStatus(connectionId); + if (resultStatus == CLIENT_RESULT_READY) + { + queryReady = true; + } + else if (resultStatus == CLIENT_RESULT_BUSY) + { + /* wait for results, errors, or interrupts */ + MultiClientResetWaitInfo(waitInfo); + MultiClientRegisterWait(waitInfo, TASK_STATUS_SOCKET_READ, connectionId); + MultiClientWait(waitInfo); + } + else + { + MultiClientFreeWaitInfo(waitInfo); + return false; + } + } + + queryOK = MultiClientQueryResult(connectionId, queryResult, rowCount, columnCount); + + MultiClientFreeWaitInfo(waitInfo); + + return queryOK; +} + + /* MultiClientSendQuery sends the given query over the given connection. */ bool MultiClientSendQuery(int32 connectionId, const char *query) @@ -532,6 +579,15 @@ MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex) } +/* MultiClientValueIsNull returns whether the value at the given position is null. */ +bool +MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex) +{ + bool isNull = PQgetisnull((PGresult *) queryResult, rowIndex, columnIndex); + return isNull; +} + + /* MultiClientClearResult free's the memory associated with a PGresult. */ void MultiClientClearResult(void *queryResult) diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index ab5b9bc38..e9658ee22 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -104,6 +104,8 @@ extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort, extern ConnectStatus MultiClientConnectPoll(int32 connectionId); extern void MultiClientDisconnect(int32 connectionId); extern bool MultiClientConnectionUp(int32 connectionId); +extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult, + int *rowCount, int *columnCount); extern bool MultiClientSendQuery(int32 connectionId, const char *query); extern bool MultiClientCancel(int32 connectionId); extern ResultStatus MultiClientResultStatus(int32 connectionId); @@ -114,6 +116,7 @@ extern bool MultiClientQueryResult(int32 connectionId, void **queryResult, extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount); extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex); +extern bool MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex); extern void MultiClientClearResult(void *queryResult); extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);