pull/6278/merge
Marco Slot 2025-02-11 20:29:39 +00:00 committed by GitHub
commit 9a243cb3a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 92 additions and 86 deletions

View File

@ -157,14 +157,38 @@ void
CitusExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
{
DestReceiver *dest = queryDesc->dest;
/*
* Disable execution of ALTER TABLE constraint validation queries. These
* constraints will be validated in worker nodes, so running these queries
* from the coordinator would be redundant.
*
* For example, ALTER TABLE ... ATTACH PARTITION checks that the new
* partition doesn't violate constraints of the parent table, which
* might involve running some SELECT queries.
*
* Ideally we'd completely skip these checks in the coordinator, but we don't
* have any means to tell postgres to skip the checks. So the best we can do is
* to not execute the queries and return an empty result set, as if this table has
* no rows, so no constraints will be violated.
*/
if (AlterTableConstraintCheck(queryDesc))
{
EState *estate = queryDesc->estate;
ParamListInfo savedBoundParams = executorBoundParams;
estate->es_processed = 0;
/* start and shutdown tuple receiver to simulate empty result */
DestReceiver *dest = queryDesc->dest;
dest->rStartup(queryDesc->dest, CMD_SELECT, queryDesc->tupDesc);
dest->rShutdown(dest);
return;
}
/*
* Save a pointer to query params so UDFs can access them by calling
* ExecutorBoundParams().
*/
ParamListInfo savedBoundParams = executorBoundParams;
executorBoundParams = queryDesc->params;
/*
@ -173,71 +197,73 @@ CitusExecutorRun(QueryDesc *queryDesc,
* we remove the totaltime instrumentation from the queryDesc. Instead we will start
* and stop the instrumentation of the total time and put it back on the queryDesc
* before returning (or rethrowing) from this function.
*
* We include subplan execution (in PreExecScan) in the total time.
*/
Instrumentation *volatile totalTime = queryDesc->totaltime;
queryDesc->totaltime = NULL;
if (totalTime)
{
InstrStartNode(totalTime);
}
/* switch into per-query memory context before calling PreExecScan */
MemoryContext oldcontext = MemoryContextSwitchTo(
queryDesc->estate->es_query_cxt);
/*
* Call PreExecScan for all citus custom scan nodes prior to starting the
* postgres exec scan to give some citus scan nodes some time to initialize
* state that would be too late if it were to initialize when the first tuple
* would need to return.
*/
List *citusCustomScanStates = FindCitusCustomScanStates(queryDesc->planstate);
CitusScanState *citusScanState = NULL;
foreach_ptr(citusScanState, citusCustomScanStates)
{
if (citusScanState->PreExecScan)
{
citusScanState->PreExecScan(citusScanState);
}
}
/* postgres will switch here again and will restore back on its own */
MemoryContextSwitchTo(oldcontext);
/*
* ExecutorLevel is used to detect nested execution and to decide
* how to execute certain queries. For instance, if we execute a
* multi-shard query that involves local shards, and it is in a nested
* execution (e.g. a PL/pgsQL function), we choose local execution
* instead of connecting to the local node.
*
* The reason for choosing local execution is that functions can have
* multiple statements, and we could therefore see a mix of statements
* that modify regular tables, Citus tables, and perform joins between
* them. Such queries can only be answered when we use local execution
* throughout the transaction.
*
* We should only raise ExecutorLevel after PreExecScan, which executes
* subplans. Otherwise, top-level subplans will immediately consider
* themselves to be nested and unnecessarily trigger local execution.
*/
ExecutorLevel++;
/*
* We use a PG_TRY to reset our bookkeeping around ExecutorLevel,
* executorBoundParams (used to know parameters in EXPLAIN ANALYZE),
*
* Execution can skip back up several levels and then resume in
* case of savepoints, so we should do the bookkeeping at every
* level instead of e.g. using the abort handler.
*/
PG_TRY();
{
ExecutorLevel++;
if (totalTime)
{
InstrStartNode(totalTime);
}
/*
* Disable execution of ALTER TABLE constraint validation queries. These
* constraints will be validated in worker nodes, so running these queries
* from the coordinator would be redundant.
*
* For example, ALTER TABLE ... ATTACH PARTITION checks that the new
* partition doesn't violate constraints of the parent table, which
* might involve running some SELECT queries.
*
* Ideally we'd completely skip these checks in the coordinator, but we don't
* have any means to tell postgres to skip the checks. So the best we can do is
* to not execute the queries and return an empty result set, as if this table has
* no rows, so no constraints will be violated.
*/
if (AlterTableConstraintCheck(queryDesc))
{
EState *estate = queryDesc->estate;
estate->es_processed = 0;
/* start and shutdown tuple receiver to simulate empty result */
dest->rStartup(queryDesc->dest, CMD_SELECT, queryDesc->tupDesc);
dest->rShutdown(dest);
}
else
{
/* switch into per-query memory context before calling PreExecScan */
MemoryContext oldcontext = MemoryContextSwitchTo(
queryDesc->estate->es_query_cxt);
/*
* Call PreExecScan for all citus custom scan nodes prior to starting the
* postgres exec scan to give some citus scan nodes some time to initialize
* state that would be too late if it were to initialize when the first tuple
* would need to return.
*/
List *citusCustomScanStates = FindCitusCustomScanStates(queryDesc->planstate);
CitusScanState *citusScanState = NULL;
foreach_ptr(citusScanState, citusCustomScanStates)
{
if (citusScanState->PreExecScan)
{
citusScanState->PreExecScan(citusScanState);
}
}
/* postgres will switch here again and will restore back on its own */
MemoryContextSwitchTo(oldcontext);
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
PG_FINALLY();
{
if (totalTime)
{
InstrStopNode(totalTime, queryDesc->estate->es_processed);
@ -255,30 +281,12 @@ CitusExecutorRun(QueryDesc *queryDesc,
* transactions.
*/
CitusTableCacheFlushInvalidatedEntries();
InTopLevelDelegatedFunctionCall = false;
}
/*
* Within a 2PC, when a function is delegated to a remote node, we pin
* the distribution argument as the shard key for all the SQL in the
* function's block. The restriction is imposed to not to access other
* nodes from the current node, and violate the transactional integrity
* of the 2PC. Now that the query is ending, reset the shard key to NULL.
*/
CheckAndResetAllowedShardKeyValueIfNeeded();
}
PG_CATCH();
{
if (totalTime)
{
queryDesc->totaltime = totalTime;
}
executorBoundParams = savedBoundParams;
ExecutorLevel--;
if (ExecutorLevel == 0 && PlannerLevel == 0)
{
/*
* If we were in a delegated function call, signal that we are now
* done and restrictions that apply outside of such function calls
* once again apply.
*/
InTopLevelDelegatedFunctionCall = false;
}
@ -287,8 +295,6 @@ CitusExecutorRun(QueryDesc *queryDesc,
* details see the function header.
*/
CheckAndResetAllowedShardKeyValueIfNeeded();
PG_RE_THROW();
}
PG_END_TRY();
}