diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index e1e3ac205..9e643b00c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -42,14 +42,17 @@ bool AllModificationsCommutative = false; static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); -static uint64 ExecuteDistributedModify(Task *task); -static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, - Task *task, EState *executorState, - TupleDesc tupleDescriptor, - DestReceiver *destination); +static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, + Task *task, + bool isModificationQuery, + bool expectResults); +static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, + DestReceiver *destination, + Tuplestorestate *tupleStore); static bool SendQueryInSingleRowMode(PGconn *connection, char *query); -static bool StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore); +static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, + TupleDesc tupleDescriptor, int64 *rows); +static bool ConsumeQueryResult(PGconn *connection, int64 *rows); /* @@ -181,6 +184,9 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; + DestReceiver *destination = queryDesc->dest; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); @@ -201,24 +207,72 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) - { - uint64 affectedRowCount = ExecuteDistributedModify(task); - estate->es_processed = affectedRowCount; - } - else if (operation == CMD_SELECT) - { - DestReceiver *destination = queryDesc->dest; - TupleDesc resultTupleDescriptor = queryDesc->tupDesc; + estate->es_processed = 0; - ExecuteSingleShardSelect(queryDesc, count, task, estate, - resultTupleDescriptor, destination); - } - else + /* startup the tuple receiver */ + if (sendTuples) { - ereport(ERROR, (errmsg("unrecognized operation code: %d", - (int) operation))); + (*destination->rStartup)(destination, operation, queryDesc->tupDesc); + } + + /* + * If query has not yet been executed, do so now. The main reason why the + * query might already have been executed is cursors. + */ + if (!routerState->eof_underlying) + { + bool resultsOK = false; + bool isModificationQuery = false; + + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE) + { + isModificationQuery = true; + } + else if (operation != CMD_SELECT) + { + ereport(ERROR, (errmsg("unrecognized operation code: %d", + (int) operation))); + } + + resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, + isModificationQuery, + sendTuples); + if (!resultsOK) + { + ereport(ERROR, (errmsg("could not receive query results"))); + } + + /* mark underlying query as having executed */ + routerState->eof_underlying = true; + } + + /* if the underlying query produced output, return it */ + if (routerState->tuplestorestate != NULL) + { + TupleDesc resultTupleDescriptor = queryDesc->tupDesc; + int64 returnedRows = 0; + + /* return rows from the tuplestore */ + returnedRows = ReturnRowsFromTuplestore(count, resultTupleDescriptor, + destination, + routerState->tuplestorestate); + + /* + * Count tuples processed, if this is a SELECT. (For modifications + * it'll already have been increased, as we want the number of + * modified tuples, not the number of RETURNed tuples.) + */ + if (operation == CMD_SELECT) + { + estate->es_processed += returnedRows; + } + } + + /* shutdown tuple receiver, if we started it */ + if (sendTuples) + { + (*destination->rShutdown)(destination); } if (queryDesc->totaltime != NULL) @@ -231,153 +285,159 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas /* - * ExecuteDistributedModify is the main entry point for modifying distributed - * tables. A distributed modification is successful if any placement of the - * distributed table is successful. ExecuteDistributedModify returns the number - * of modified rows in that case and errors in all others. This function will - * also generate warnings for individual placement failures. + * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves + * the results and stores them, if SELECT or RETURNING is used, in a tuple + * store. + * + * If the task fails on one of the placements, the function retries it on + * other placements (SELECT), reraises the remote error (constraint violation + * in DML), marks the affected placement as invalid (DML on some placement + * failed), or errors out (DML failed on all placements). */ -static uint64 -ExecuteDistributedModify(Task *task) +static bool +ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, + bool expectResults) { - int64 affectedTupleCount = -1; + TupleDesc tupleDescriptor = queryDesc->tupDesc; + EState *executorState = queryDesc->estate; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + bool resultsOK = false; + List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; List *failedPlacementList = NIL; ListCell *failedPlacementCell = NULL; + int64 affectedTupleCount = -1; + bool gotResults = false; - foreach(taskPlacementCell, task->taskPlacementList) + /* + * Try to run the query to completion on one placement. If the query fails + * attempt the query on the next placement. + */ + foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); char *nodeName = taskPlacement->nodeName; int32 nodePort = taskPlacement->nodePort; + bool queryOK = false; + int64 currentAffectedTupleCount = 0; + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - PGconn *connection = NULL; - PGresult *result = NULL; - char *currentAffectedTupleString = NULL; - int64 currentAffectedTupleCount = -1; - - Assert(taskPlacement->shardState == FILE_FINALIZED); - - connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } - result = PQexec(connection, task->queryString); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + queryOK = SendQueryInSingleRowMode(connection, task->queryString); + if (!queryOK) { - char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); - int category = 0; - bool raiseError = false; - - /* - * If the error code is in constraint violation class, we want to - * fail fast because we must get the same error from all shard - * placements. - */ - category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); - - if (raiseError) - { - ReraiseRemoteError(connection, result); - } - else - { - WarnRemoteError(connection, result); - } - PQclear(result); - + PurgeConnection(connection); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } - currentAffectedTupleString = PQcmdTuples(result); - - /* could throw error if input > MAX_INT64 */ - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); - Assert(currentAffectedTupleCount >= 0); - -#if (PG_VERSION_NUM < 90600) - - /* before 9.6, PostgreSQL used a uint32 for this field, so check */ - Assert(currentAffectedTupleCount <= 0xFFFFFFFF); -#endif - - if ((affectedTupleCount == -1) || - (affectedTupleCount == currentAffectedTupleCount)) + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (!gotResults && expectResults) { - affectedTupleCount = currentAffectedTupleCount; + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + ¤tAffectedTupleCount); } else { - ereport(WARNING, - (errmsg("modified " INT64_FORMAT " tuples, but expected to modify " - INT64_FORMAT, currentAffectedTupleCount, affectedTupleCount), - errdetail("modified placement on %s:%d", nodeName, nodePort))); + queryOK = ConsumeQueryResult(connection, ¤tAffectedTupleCount); } - PQclear(result); + if (queryOK) + { + if ((affectedTupleCount == -1) || + (affectedTupleCount == currentAffectedTupleCount)) + { + affectedTupleCount = currentAffectedTupleCount; + } + else + { + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples, but expected " + "to modify "INT64_FORMAT, + currentAffectedTupleCount, affectedTupleCount), + errdetail("modified placement on %s:%d", + nodeName, nodePort))); + } + +#if (PG_VERSION_NUM < 90600) + + /* before 9.6, PostgreSQL used a uint32 for this field, so check */ + Assert(currentAffectedTupleCount <= 0xFFFFFFFF); +#endif + + resultsOK = true; + gotResults = true; + + /* + * Modifications have to be executed on all placements, but for + * read queries we can stop here. + */ + if (!isModificationQuery) + { + break; + } + } + else + { + PurgeConnection(connection); + + failedPlacementList = lappend(failedPlacementList, taskPlacement); + + continue; + } } - /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) + if (isModificationQuery) { - ereport(ERROR, (errmsg("could not modify any active placements"))); + /* if all placements failed, error out */ + if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) + { + ereport(ERROR, (errmsg("could not modify any active placements"))); + } + + /* otherwise, mark failed placements as inactive: they're stale */ + foreach(failedPlacementCell, failedPlacementList) + { + ShardPlacement *failedPlacement = + (ShardPlacement *) lfirst(failedPlacementCell); + uint64 shardLength = 0; + + DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, + failedPlacement->nodePort); + InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + failedPlacement->nodeName, failedPlacement->nodePort); + } + + executorState->es_processed = affectedTupleCount; } - /* otherwise, mark failed placements as inactive: they're stale */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 shardLength = 0; - - DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, - failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, - failedPlacement->nodeName, failedPlacement->nodePort); - } - - return (uint64) affectedTupleCount; + return resultsOK; } /* - * ExecuteSingleShardSelect executes, if not done already, the remote select query and - * sends the resulting tuples to the given destination receiver. If the query fails on a - * given placement, the function attempts it on its replica. + * ReturnRowsFromTuplestore moves rows from a given tuplestore into a + * receiver. It performs the necessary limiting to support cursors. */ -static void -ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, - EState *executorState, TupleDesc tupleDescriptor, - DestReceiver *destination) +static uint64 +ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, + DestReceiver *destination, Tuplestorestate *tupleStore) { - bool resultsOK = false; TupleTableSlot *tupleTableSlot = NULL; - MaterialState *routerState = (MaterialState *) queryDesc->planstate; - Tuplestorestate *tupleStore = routerState->tuplestorestate; uint64 currentTupleCount = 0; - /* initialize tuplestore for the first call */ - if (routerState->tuplestorestate == NULL) - { - routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); - tupleStore = routerState->tuplestorestate; - - resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore); - if (!resultsOK) - { - ereport(ERROR, (errmsg("could not receive query results"))); - } - } - tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); - /* startup the tuple receiver */ - (*destination->rStartup)(destination, CMD_SELECT, tupleDescriptor); - /* iterate over tuples in tuple store, and send them to destination */ for (;;) { @@ -388,7 +448,6 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, } (*destination->receiveSlot)(tupleTableSlot, destination); - executorState->es_processed++; ExecClearTuple(tupleTableSlot); @@ -404,65 +463,9 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, } } - /* shutdown the tuple receiver */ - (*destination->rShutdown)(destination); - ExecDropSingleTupleTableSlot(tupleTableSlot); -} - -/* - * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves - * the results and stores them in the given tuple store. If the task fails on - * one of the placements, the function retries it on other placements. - */ -bool -ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore) -{ - bool resultsOK = false; - List *taskPlacementList = task->taskPlacementList; - ListCell *taskPlacementCell = NULL; - - /* - * Try to run the query to completion on one placement. If the query fails - * attempt the query on the next placement. - */ - foreach(taskPlacementCell, taskPlacementList) - { - ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - char *nodeName = taskPlacement->nodeName; - int32 nodePort = taskPlacement->nodePort; - bool queryOK = false; - bool storedOK = false; - - PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - if (connection == NULL) - { - continue; - } - - queryOK = SendQueryInSingleRowMode(connection, task->queryString); - if (!queryOK) - { - PurgeConnection(connection); - continue; - } - - storedOK = StoreQueryResult(connection, tupleDescriptor, tupleStore); - if (storedOK) - { - resultsOK = true; - break; - } - else - { - tuplestore_clear(tupleStore); - PurgeConnection(connection); - } - } - - return resultsOK; + return currentTupleCount; } @@ -497,25 +500,38 @@ SendQueryInSingleRowMode(PGconn *connection, char *query) /* * StoreQueryResult gets the query results from the given connection, builds - * tuples from the results and stores them in the given tuple-store. If the - * function can't receive query results, it returns false. Note that this - * function assumes the query has already been sent on the connection and the - * tuplestore has earlier been initialized. + * tuples from the results, and stores them in the a newly created + * tuple-store. If the function can't receive query results, it returns + * false. Note that this function assumes the query has already been sent on + * the connection. */ static bool -StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore) +StoreQueryResult(MaterialState *routerState, PGconn *connection, + TupleDesc tupleDescriptor, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); + Tuplestorestate *tupleStore = NULL; uint32 expectedColumnCount = tupleDescriptor->natts; char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); + bool commandFailed = false; MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, "StoreQueryResult", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + *rows = 0; - Assert(tupleStore != NULL); + if (routerState->tuplestorestate == NULL) + { + routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); + } + else + { + /* might have failed query execution on another placement before */ + tuplestore_clear(routerState->tuplestorestate); + } + + tupleStore = routerState->tuplestorestate; for (;;) { @@ -534,10 +550,33 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, resultStatus = PQresultStatus(result); if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK)) { - WarnRemoteError(connection, result); + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } + PQclear(result); - return false; + commandFailed = true; + + /* continue, there could be other lingering results due to row mode */ + continue; } rowCount = PQntuples(result); @@ -575,6 +614,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, tuplestore_puttuple(tupleStore, heapTuple); MemoryContextReset(ioContext); + (*rows)++; } PQclear(result); @@ -582,7 +622,98 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, pfree(columnArray); - return true; + return !commandFailed; +} + + +/* + * ConsumeQueryResult gets a query result from a connection, counting the rows + * and checking for errors, but otherwise discarding potentially returned + * rows. Returns true if a non-error result has been returned, false if there + * has been an error. + */ +static bool +ConsumeQueryResult(PGconn *connection, int64 *rows) +{ + bool commandFailed = false; + bool gotResponse = false; + + *rows = 0; + + /* + * Due to single row mode we have to do multiple PQgetResult() to finish + * processing of this query, even without RETURNING. For single-row mode + * we have to loop until all rows are consumed. + */ + while (true) + { + PGresult *result = PQgetResult(connection); + ExecStatusType status = PGRES_COMMAND_OK; + + if (result == NULL) + { + break; + } + + status = PQresultStatus(result); + + if (status != PGRES_COMMAND_OK && + status != PGRES_SINGLE_TUPLE && + status != PGRES_TUPLES_OK) + { + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } + PQclear(result); + + commandFailed = true; + + /* continue, there could be other lingering results due to row mode */ + continue; + } + + if (status == PGRES_COMMAND_OK) + { + char *currentAffectedTupleString = PQcmdTuples(result); + int64 currentAffectedTupleCount = 0; + + scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + Assert(currentAffectedTupleCount >= 0); + +#if (PG_VERSION_NUM < 90600) + + /* before 9.6, PostgreSQL used a uint32 for this field, so check */ + Assert(currentAffectedTupleCount <= 0xFFFFFFFF); +#endif + *rows += currentAffectedTupleCount; + } + else + { + *rows += PQntuples(result); + } + + PQclear(result); + gotResponse = true; + } + + return gotResponse && !commandFailed; } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b2400071e..f18af7686 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -18,8 +18,6 @@ extern bool AllModificationsCommutative; extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task); -extern bool ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);