mirror of https://github.com/citusdata/citus.git
Add MultiClientExecute and MultiClientValueIsNull for simple remote query execution
parent
ea2c2f096a
commit
5e432449ba
|
@ -315,6 +315,53 @@ MultiClientConnectionUp(int32 connectionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* MultiClientExecute synchronously executes a query over the given connection. */
|
||||||
|
bool
|
||||||
|
MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
|
||||||
|
int *rowCount, int *columnCount)
|
||||||
|
{
|
||||||
|
bool querySent = false;
|
||||||
|
bool queryReady = false;
|
||||||
|
bool queryOK = false;
|
||||||
|
WaitInfo *waitInfo = NULL;
|
||||||
|
|
||||||
|
querySent = MultiClientSendQuery(connectionId, query);
|
||||||
|
if (!querySent)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
waitInfo = MultiClientCreateWaitInfo(1);
|
||||||
|
|
||||||
|
while (!queryReady)
|
||||||
|
{
|
||||||
|
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
|
||||||
|
if (resultStatus == CLIENT_RESULT_READY)
|
||||||
|
{
|
||||||
|
queryReady = true;
|
||||||
|
}
|
||||||
|
else if (resultStatus == CLIENT_RESULT_BUSY)
|
||||||
|
{
|
||||||
|
/* wait for results, errors, or interrupts */
|
||||||
|
MultiClientResetWaitInfo(waitInfo);
|
||||||
|
MultiClientRegisterWait(waitInfo, TASK_STATUS_SOCKET_READ, connectionId);
|
||||||
|
MultiClientWait(waitInfo);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MultiClientFreeWaitInfo(waitInfo);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queryOK = MultiClientQueryResult(connectionId, queryResult, rowCount, columnCount);
|
||||||
|
|
||||||
|
MultiClientFreeWaitInfo(waitInfo);
|
||||||
|
|
||||||
|
return queryOK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* MultiClientSendQuery sends the given query over the given connection. */
|
/* MultiClientSendQuery sends the given query over the given connection. */
|
||||||
bool
|
bool
|
||||||
MultiClientSendQuery(int32 connectionId, const char *query)
|
MultiClientSendQuery(int32 connectionId, const char *query)
|
||||||
|
@ -532,6 +579,15 @@ MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* MultiClientValueIsNull returns whether the value at the given position is null. */
|
||||||
|
bool
|
||||||
|
MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex)
|
||||||
|
{
|
||||||
|
bool isNull = PQgetisnull((PGresult *) queryResult, rowIndex, columnIndex);
|
||||||
|
return isNull;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* MultiClientClearResult free's the memory associated with a PGresult. */
|
/* MultiClientClearResult free's the memory associated with a PGresult. */
|
||||||
void
|
void
|
||||||
MultiClientClearResult(void *queryResult)
|
MultiClientClearResult(void *queryResult)
|
||||||
|
|
|
@ -104,6 +104,8 @@ extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
|
||||||
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
|
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
|
||||||
extern void MultiClientDisconnect(int32 connectionId);
|
extern void MultiClientDisconnect(int32 connectionId);
|
||||||
extern bool MultiClientConnectionUp(int32 connectionId);
|
extern bool MultiClientConnectionUp(int32 connectionId);
|
||||||
|
extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
|
||||||
|
int *rowCount, int *columnCount);
|
||||||
extern bool MultiClientSendQuery(int32 connectionId, const char *query);
|
extern bool MultiClientSendQuery(int32 connectionId, const char *query);
|
||||||
extern bool MultiClientCancel(int32 connectionId);
|
extern bool MultiClientCancel(int32 connectionId);
|
||||||
extern ResultStatus MultiClientResultStatus(int32 connectionId);
|
extern ResultStatus MultiClientResultStatus(int32 connectionId);
|
||||||
|
@ -114,6 +116,7 @@ extern bool MultiClientQueryResult(int32 connectionId, void **queryResult,
|
||||||
extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult,
|
extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult,
|
||||||
int *rowCount, int *columnCount);
|
int *rowCount, int *columnCount);
|
||||||
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);
|
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);
|
||||||
|
extern bool MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex);
|
||||||
extern void MultiClientClearResult(void *queryResult);
|
extern void MultiClientClearResult(void *queryResult);
|
||||||
extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);
|
extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue