diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 386a278b4..21471fd0d 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -157,14 +157,38 @@ void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - DestReceiver *dest = queryDesc->dest; + /* + * Disable execution of ALTER TABLE constraint validation queries. These + * constraints will be validated in worker nodes, so running these queries + * from the coordinator would be redundant. + * + * For example, ALTER TABLE ... ATTACH PARTITION checks that the new + * partition doesn't violate constraints of the parent table, which + * might involve running some SELECT queries. + * + * Ideally we'd completely skip these checks in the coordinator, but we don't + * have any means to tell postgres to skip the checks. So the best we can do is + * to not execute the queries and return an empty result set, as if this table has + * no rows, so no constraints will be violated. + */ + if (AlterTableConstraintCheck(queryDesc)) + { + EState *estate = queryDesc->estate; - ParamListInfo savedBoundParams = executorBoundParams; + estate->es_processed = 0; + + /* start and shutdown tuple receiver to simulate empty result */ + DestReceiver *dest = queryDesc->dest; + dest->rStartup(queryDesc->dest, CMD_SELECT, queryDesc->tupDesc); + dest->rShutdown(dest); + return; + } /* * Save a pointer to query params so UDFs can access them by calling * ExecutorBoundParams(). */ + ParamListInfo savedBoundParams = executorBoundParams; executorBoundParams = queryDesc->params; /* @@ -173,71 +197,73 @@ CitusExecutorRun(QueryDesc *queryDesc, * we remove the totaltime instrumentation from the queryDesc. Instead we will start * and stop the instrumentation of the total time and put it back on the queryDesc * before returning (or rethrowing) from this function. + * + * We include subplan execution (in PreExecScan) in the total time. */ Instrumentation *volatile totalTime = queryDesc->totaltime; queryDesc->totaltime = NULL; + if (totalTime) + { + InstrStartNode(totalTime); + } + + /* switch into per-query memory context before calling PreExecScan */ + MemoryContext oldcontext = MemoryContextSwitchTo( + queryDesc->estate->es_query_cxt); + + /* + * Call PreExecScan for all citus custom scan nodes prior to starting the + * postgres exec scan to give some citus scan nodes some time to initialize + * state that would be too late if it were to initialize when the first tuple + * would need to return. + */ + List *citusCustomScanStates = FindCitusCustomScanStates(queryDesc->planstate); + CitusScanState *citusScanState = NULL; + foreach_ptr(citusScanState, citusCustomScanStates) + { + if (citusScanState->PreExecScan) + { + citusScanState->PreExecScan(citusScanState); + } + } + + /* postgres will switch here again and will restore back on its own */ + MemoryContextSwitchTo(oldcontext); + + /* + * ExecutorLevel is used to detect nested execution and to decide + * how to execute certain queries. For instance, if we execute a + * multi-shard query that involves local shards, and it is in a nested + * execution (e.g. a PL/pgsQL function), we choose local execution + * instead of connecting to the local node. + * + * The reason for choosing local execution is that functions can have + * multiple statements, and we could therefore see a mix of statements + * that modify regular tables, Citus tables, and perform joins between + * them. Such queries can only be answered when we use local execution + * throughout the transaction. + * + * We should only raise ExecutorLevel after PreExecScan, which executes + * subplans. Otherwise, top-level subplans will immediately consider + * themselves to be nested and unnecessarily trigger local execution. + */ + ExecutorLevel++; + + /* + * We use a PG_TRY to reset our bookkeeping around ExecutorLevel, + * executorBoundParams (used to know parameters in EXPLAIN ANALYZE), + * + * Execution can skip back up several levels and then resume in + * case of savepoints, so we should do the bookkeeping at every + * level instead of e.g. using the abort handler. + */ PG_TRY(); { - ExecutorLevel++; - - if (totalTime) - { - InstrStartNode(totalTime); - } - - /* - * Disable execution of ALTER TABLE constraint validation queries. These - * constraints will be validated in worker nodes, so running these queries - * from the coordinator would be redundant. - * - * For example, ALTER TABLE ... ATTACH PARTITION checks that the new - * partition doesn't violate constraints of the parent table, which - * might involve running some SELECT queries. - * - * Ideally we'd completely skip these checks in the coordinator, but we don't - * have any means to tell postgres to skip the checks. So the best we can do is - * to not execute the queries and return an empty result set, as if this table has - * no rows, so no constraints will be violated. - */ - if (AlterTableConstraintCheck(queryDesc)) - { - EState *estate = queryDesc->estate; - - estate->es_processed = 0; - - /* start and shutdown tuple receiver to simulate empty result */ - dest->rStartup(queryDesc->dest, CMD_SELECT, queryDesc->tupDesc); - dest->rShutdown(dest); - } - else - { - /* switch into per-query memory context before calling PreExecScan */ - MemoryContext oldcontext = MemoryContextSwitchTo( - queryDesc->estate->es_query_cxt); - - /* - * Call PreExecScan for all citus custom scan nodes prior to starting the - * postgres exec scan to give some citus scan nodes some time to initialize - * state that would be too late if it were to initialize when the first tuple - * would need to return. - */ - List *citusCustomScanStates = FindCitusCustomScanStates(queryDesc->planstate); - CitusScanState *citusScanState = NULL; - foreach_ptr(citusScanState, citusCustomScanStates) - { - if (citusScanState->PreExecScan) - { - citusScanState->PreExecScan(citusScanState); - } - } - - /* postgres will switch here again and will restore back on its own */ - MemoryContextSwitchTo(oldcontext); - - standard_ExecutorRun(queryDesc, direction, count, execute_once); - } - + standard_ExecutorRun(queryDesc, direction, count, execute_once); + } + PG_FINALLY(); + { if (totalTime) { InstrStopNode(totalTime, queryDesc->estate->es_processed); @@ -255,30 +281,12 @@ CitusExecutorRun(QueryDesc *queryDesc, * transactions. */ CitusTableCacheFlushInvalidatedEntries(); - InTopLevelDelegatedFunctionCall = false; - } - /* - * Within a 2PC, when a function is delegated to a remote node, we pin - * the distribution argument as the shard key for all the SQL in the - * function's block. The restriction is imposed to not to access other - * nodes from the current node, and violate the transactional integrity - * of the 2PC. Now that the query is ending, reset the shard key to NULL. - */ - CheckAndResetAllowedShardKeyValueIfNeeded(); - } - PG_CATCH(); - { - if (totalTime) - { - queryDesc->totaltime = totalTime; - } - - executorBoundParams = savedBoundParams; - ExecutorLevel--; - - if (ExecutorLevel == 0 && PlannerLevel == 0) - { + /* + * If we were in a delegated function call, signal that we are now + * done and restrictions that apply outside of such function calls + * once again apply. + */ InTopLevelDelegatedFunctionCall = false; } @@ -287,8 +295,6 @@ CitusExecutorRun(QueryDesc *queryDesc, * details see the function header. */ CheckAndResetAllowedShardKeyValueIfNeeded(); - - PG_RE_THROW(); } PG_END_TRY(); }