diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d2d7d9b23..4b5d2a72a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3430,10 +3430,7 @@ InitializeCopyShardState(CopyShardState *shardState, ereport(ERROR, (errmsg("could not connect to any active placements"))); } - if (hasRemoteCopy) - { - EnsureRemoteTaskExecutionAllowed(); - } + EnsureTaskExecutionAllowed(hasRemoteCopy); /* * We just error out and code execution should never reach to this diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 2b32916ee..9fd192c10 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1321,7 +1321,8 @@ StartDistributedExecution(DistributedExecution *execution) /* make sure we are not doing remote execution from within a task */ if (execution->remoteTaskList != NIL) { - EnsureRemoteTaskExecutionAllowed(); + bool isRemote = true; + EnsureTaskExecutionAllowed(isRemote); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 8c9b9d3d0..5c1f0981e 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -108,26 +108,26 @@ bool EnableLocalExecution = true; bool LogLocalCommands = false; -int LocalExecutorLevel = 0; +/* global variable that tracks whether the local execution is on a shard */ +uint64 LocalExecutorShardId = INVALID_SHARD_ID; static LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; -static uint64 ExecuteLocalTaskListInternal(List *taskList, - ParamListInfo paramListInfo, - DistributedPlan *distributedPlan, - TupleDestination *defaultTupleDest, - bool isUtilityCommand); static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); -static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, - TupleDestination *tupleDest, Task *task, - ParamListInfo paramListInfo); +static uint64 LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, + TupleDestination *tupleDest, Task *task, + ParamListInfo paramListInfo); +static uint64 ExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, + TupleDestination *tupleDest, Task *task, + ParamListInfo paramListInfo); static void RecordNonDistTableAccessesForTask(Task *task); static void LogLocalCommand(Task *task); static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest, Task *task); +static void LocallyExecuteUtilityTask(Task *task); static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery); static void EnsureTransitionPossible(LocalExecutionStatus from, LocalExecutionStatus to); @@ -204,37 +204,7 @@ ExecuteLocalTaskListExtended(List *taskList, TupleDestination *defaultTupleDest, bool isUtilityCommand) { - uint64 totalRowsProcessed = 0; ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); - - LocalExecutorLevel++; - PG_TRY(); - { - totalRowsProcessed = ExecuteLocalTaskListInternal(taskList, paramListInfo, - distributedPlan, - defaultTupleDest, - isUtilityCommand); - } - PG_CATCH(); - { - LocalExecutorLevel--; - - PG_RE_THROW(); - } - PG_END_TRY(); - LocalExecutorLevel--; - - return totalRowsProcessed; -} - - -static uint64 -ExecuteLocalTaskListInternal(List *taskList, - ParamListInfo paramListInfo, - DistributedPlan *distributedPlan, - TupleDestination *defaultTupleDest, - bool isUtilityCommand) -{ uint64 totalRowsProcessed = 0; int numParams = 0; Oid *parameterTypes = NULL; @@ -250,6 +220,12 @@ ExecuteLocalTaskListInternal(List *taskList, numParams = paramListInfo->numParams; } + if (taskList != NIL) + { + bool isRemote = false; + EnsureTaskExecutionAllowed(isRemote); + } + /* * Use a new memory context that gets reset after every task to free * the deparsed query string and query plan. @@ -291,7 +267,7 @@ ExecuteLocalTaskListInternal(List *taskList, if (isUtilityCommand) { - ExecuteUtilityCommand(TaskQueryString(task)); + LocallyExecuteUtilityTask(task); MemoryContextSwitchTo(oldContext); MemoryContextReset(loopContext); @@ -378,8 +354,8 @@ ExecuteLocalTaskListInternal(List *taskList, } totalRowsProcessed += - ExecuteLocalTaskPlan(localPlan, shardQueryString, - tupleDest, task, paramListInfo); + LocallyExecuteTaskPlan(localPlan, shardQueryString, + tupleDest, task, paramListInfo); MemoryContextSwitchTo(oldContext); MemoryContextReset(loopContext); @@ -408,9 +384,9 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple ParamListInfo paramListInfo = NULL; PlannedStmt *localPlan = planner_compat(shardQuery, cursorOptions, paramListInfo); - totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, - tupleDest, task, - paramListInfo); + totalProcessedRows += LocallyExecuteTaskPlan(localPlan, queryString, + tupleDest, task, + paramListInfo); } return totalProcessedRows; } @@ -431,6 +407,39 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } +/* + * LocallyExecuteUtilityTask runs a utility command via local execution. + */ +static void +LocallyExecuteUtilityTask(Task *task) +{ + /* + * If we roll back to a savepoint, we may no longer be in a query on + * a shard. Reset the value as we go back up the stack. + */ + uint64 prevLocalExecutorShardId = LocalExecutorShardId; + + if (task->anchorShardId != INVALID_SHARD_ID) + { + LocalExecutorShardId = task->anchorShardId; + } + + PG_TRY(); + { + ExecuteUtilityCommand(TaskQueryString(task)); + } + PG_CATCH(); + { + LocalExecutorShardId = prevLocalExecutorShardId; + + PG_RE_THROW(); + } + PG_END_TRY(); + + LocalExecutorShardId = prevLocalExecutorShardId; +} + + /* * ExecuteUtilityCommand executes the given task query in the current * session. @@ -617,9 +626,50 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement * case of DML. */ static uint64 -ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, - TupleDestination *tupleDest, Task *task, - ParamListInfo paramListInfo) +LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, + TupleDestination *tupleDest, Task *task, + ParamListInfo paramListInfo) +{ + volatile uint64 processedRows = 0; + + /* + * If we roll back to a savepoint, we may no longer be in a query on + * a shard. Reset the value as we go back up the stack. + */ + uint64 prevLocalExecutorShardId = LocalExecutorShardId; + + if (task->anchorShardId != INVALID_SHARD_ID) + { + LocalExecutorShardId = task->anchorShardId; + } + + PG_TRY(); + { + processedRows = ExecuteTaskPlan(taskPlan, queryString, tupleDest, task, + paramListInfo); + } + PG_CATCH(); + { + LocalExecutorShardId = prevLocalExecutorShardId; + + PG_RE_THROW(); + } + PG_END_TRY(); + + LocalExecutorShardId = prevLocalExecutorShardId; + + return processedRows; +} + + +/* + * ExecuteTaskPlan executes the given planned statement and writes the results + * to tupleDest. + */ +static uint64 +ExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, + TupleDestination *tupleDest, Task *task, + ParamListInfo paramListInfo) { ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); @@ -629,7 +679,7 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, RecordNonDistTableAccessesForTask(task); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "ExecuteLocalTaskPlan", + "ExecuteTaskPlan", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 4c1fdf84a..9cbf065ae 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -51,6 +51,7 @@ #include "tcop/dest.h" #include "tcop/pquery.h" #include "tcop/utility.h" +#include "utils/fmgrprotos.h" #include "utils/snapmgr.h" #include "utils/memutils.h" @@ -63,6 +64,12 @@ int MultiShardConnectionType = PARALLEL_CONNECTION; bool WritableStandbyCoordinator = false; bool AllowModificationsFromWorkersToReplicatedTables = true; +/* + * Setting that controls whether distributed queries should be + * allowed within a task execution. + */ +bool AllowNestedDistributedExecution = false; + /* * Pointer to bound parameters of the current ongoing call to ExecutorRun. * If executor is not running, then this value is meaningless. @@ -88,6 +95,11 @@ static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static List * FindCitusCustomScanStates(PlanState *planState); static bool CitusCustomScanStateWalker(PlanState *planState, List **citusCustomScanStates); +static bool IsTaskExecutionAllowed(bool isRemote); +static bool InLocalTaskExecutionOnShard(void); +static bool MaybeInRemoteTaskExecution(void); +static bool InTrigger(void); + /* * CitusExecutorStart is the ExecutorStart_hook that gets called when @@ -871,43 +883,146 @@ ExecutorBoundParams(void) /* - * EnsureRemoteTaskExecutionAllowed ensures that we do not perform remote + * EnsureTaskExecutionAllowed ensures that we do not perform remote * execution from within a task. That could happen when the user calls * a function in a query that gets pushed down to the worker, and the * function performs a query on a distributed table. */ void -EnsureRemoteTaskExecutionAllowed(void) +EnsureTaskExecutionAllowed(bool isRemote) { - if (!InTaskExecution()) + if (IsTaskExecutionAllowed(isRemote)) { - /* we are not within a task, distributed execution is allowed */ return; } ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a " - "shard"))); + "shard"), + errdetail("Executing a distributed query in a function call that " + "may be pushed to a remote node can lead to incorrect " + "results."), + errhint("Avoid nesting of distributed queries or use alter user " + "current_user set citus.allow_nested_distributed_execution " + "to on to allow it with possible incorrectness."))); } /* - * InTaskExecution determines whether we are currently in a task execution. + * IsTaskExecutionAllowed determines whether task execution is currently allowed. + * In general, nested distributed execution is not allowed, except in a few cases + * (forced function call delegation, triggers). + * + * We distinguish between local and remote tasks because triggers only disallow + * remote task execution. */ -bool -InTaskExecution(void) +static bool +IsTaskExecutionAllowed(bool isRemote) { - if (LocalExecutorLevel > 0) + if (AllowNestedDistributedExecution) { - /* in a local task */ + /* user explicitly allows nested execution */ return true; } - /* - * Normally, any query execution within a citus-initiated backend - * is considered a task execution, but an exception is when we - * are in a delegated function/procedure call. - */ - return IsCitusInternalBackend() && - !InTopLevelDelegatedFunctionCall && - !InDelegatedProcedureCall; + if (!isRemote) + { + if (AllowedDistributionColumnValue.isActive) + { + /* + * When we are in a forced delegated function call, we explicitly check + * whether local tasks use the same distribution column value in + * EnsureForceDelegationDistributionKey. + */ + return true; + } + + if (InTrigger()) + { + /* + * In triggers on shards we only disallow remote tasks. This has a few + * reasons: + * + * - We want to enable access to co-located shards, but do not have additional + * checks yet. + * - Users need to explicitly set enable_unsafe_triggers in order to create + * triggers on distributed tables. + * - Triggers on Citus local tables should be able to access other Citus local + * tables. + */ + return true; + } + } + + return !InLocalTaskExecutionOnShard() && !MaybeInRemoteTaskExecution(); +} + + +/* + * InLocalTaskExecutionOnShard returns whether we are currently in the local executor + * and it is working on a shard of a distributed table. + * + * In general, we can allow distributed queries inside of local executor, because + * we can correctly assign tasks to connections. However, we preemptively protect + * against distributed queries inside of queries on shards of a distributed table, + * because those might start failing after a shard move. + */ +static bool +InLocalTaskExecutionOnShard(void) +{ + if (LocalExecutorShardId == INVALID_SHARD_ID) + { + /* local executor is not active or is processing a task without shards */ + return false; + } + + if (!DistributedTableShardId(LocalExecutorShardId)) + { + /* + * Local executor is processing a query on a shard, but the shard belongs + * to a reference table or Citus local table. We do not expect those to + * move. + */ + return false; + } + + return true; +} + + +/* + * MaybeInRemoteTaskExecution returns whether we could in a remote task execution. + * + * We consider anything that happens in a Citus-internal backend, except deleged + * function or procedure calls as a potential task execution. + * + * This function will also return true in other scenarios, such as during metadata + * syncing. However, since this function is mainly used for restricting (dangerous) + * nested executions, it is good to be pessimistic. + */ +static bool +MaybeInRemoteTaskExecution(void) +{ + if (!IsCitusInternalBackend()) + { + /* in a regular, client-initiated backend doing a regular task */ + return false; + } + + if (InTopLevelDelegatedFunctionCall || InDelegatedProcedureCall) + { + /* in a citus-initiated backend, but also in a delegated a procedure call */ + return false; + } + + return true; +} + + +/* + * InTrigger returns whether the execution is currently in a trigger. + */ +static bool +InTrigger(void) +{ + return DatumGetInt32(pg_trigger_depth(NULL)) > 0; } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 8f5ebae97..3e492db6e 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -719,6 +719,24 @@ ReferenceTableShardId(uint64 shardId) } +/* + * DistributedTableShardId returns true if the given shardId belongs to + * a distributed table. + */ +bool +DistributedTableShardId(uint64 shardId) +{ + if (shardId == INVALID_SHARD_ID) + { + return false; + } + + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + return IsCitusTableTypeCacheEntry(tableEntry, DISTRIBUTED_TABLE); +} + + /* * LoadGroupShardPlacement returns the cached shard placement metadata * diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 79fb039c0..ca6a100ef 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -601,6 +601,23 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.allow_nested_distributed_execution", + gettext_noop("Enables distributed execution within a task " + "of another distributed execution."), + gettext_noop("Nested distributed execution can happen when Citus " + "pushes down a call to a user-defined function within " + "a distributed query, and the function contains another " + "distributed query. In this scenario, Citus makes no " + "guarantess with regards to correctness and it is therefore " + "disallowed by default. This setting can be used to allow " + "nested distributed execution."), + &AllowNestedDistributedExecution, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.allow_unsafe_locks_from_workers", gettext_noop("Enables acquiring a distributed lock from a worker " diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 9410d8ed8..9d041f4a9 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -153,8 +153,10 @@ ErrorIfRelationIsAKnownShard(Oid relationId) void ErrorIfIllegallyChangingKnownShard(Oid relationId) { - if (LocalExecutorLevel > 0 || - (IsCitusInternalBackend() || IsRebalancerInternalBackend()) || + /* allow Citus to make changes, and allow the user if explicitly enabled */ + if (LocalExecutorShardId != INVALID_SHARD_ID || + IsCitusInternalBackend() || + IsRebalancerInternalBackend() || EnableManualChangesToShards) { return; diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index d2b8cce9c..c555f1f82 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -19,7 +19,8 @@ extern bool EnableLocalExecution; extern bool LogLocalCommands; -extern int LocalExecutorLevel; +/* global variable that tracks whether the local execution is on a shard */ +extern uint64 LocalExecutorShardId; typedef enum LocalExecutionStatus { diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e190aef6f..f01ab07d5 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -155,6 +155,7 @@ extern List * CitusTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern Oid RelationIdForShard(uint64 shardId); extern bool ReferenceTableShardId(uint64 shardId); +extern bool DistributedTableShardId(uint64 shardId); extern ShardPlacement * ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId); extern ShardPlacement * ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index dd10c511d..c8254bf44 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -61,6 +61,7 @@ typedef struct TransactionProperties } TransactionProperties; +extern bool AllowNestedDistributedExecution; extern int MultiShardConnectionType; extern bool WritableStandbyCoordinator; extern bool AllowModificationsFromWorkersToReplicatedTables; @@ -150,8 +151,7 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, const char ***parameterValues, bool useOriginalCustomTypeOids); extern ParamListInfo ExecutorBoundParams(void); -extern void EnsureRemoteTaskExecutionAllowed(void); -extern bool InTaskExecution(void); +extern void EnsureTaskExecutionAllowed(bool isRemote); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 2c958d041..d7a008054 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -75,6 +75,12 @@ typedef struct AllowedDistributionColumn int executorLevel; } AllowedDistributionColumn; +/* + * The current distribution column value passed as an argument to a forced + * function call delegation. + */ +extern AllowedDistributionColumn AllowedDistributionColumnValue; + /* * GUC that determines whether a SELECT in a transaction block should also run in * a transaction block on the worker. diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index b5a86c98e..be08a77fa 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -185,6 +185,7 @@ class PostgresConfig(CitusDefaultClusterConfig): self.new_settings = { "citus.use_citus_managed_tables": False, } + self.skip_tests = ["nested_execution"] class CitusSingleNodeClusterConfig(CitusDefaultClusterConfig): @@ -325,7 +326,7 @@ class CitusNonMxClusterConfig(CitusDefaultClusterConfig): self.is_mx = False # citus does not support distributed functions # when metadata is not synced - self.skip_tests = ["function_create", "functions"] + self.skip_tests = ["function_create", "functions", "nested_execution"] class PGUpgradeConfig(CitusBaseClusterConfig): diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 1be1b2360..f6e104f71 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -1,7 +1,7 @@ test: intermediate_result_pruning_create test: prepared_statements_create_load ch_benchmarks_create_load test: dropped_columns_create_load distributed_planning_create_load -test: local_dist_join_load +test: local_dist_join_load nested_execution_create test: partitioned_indexes_create test: connectivity_checks test: views_create diff --git a/src/test/regress/expected/citus_local_table_triggers.out b/src/test/regress/expected/citus_local_table_triggers.out index a5925cb25..97b93e756 100644 --- a/src/test/regress/expected/citus_local_table_triggers.out +++ b/src/test/regress/expected/citus_local_table_triggers.out @@ -423,7 +423,7 @@ NOTICE: executing the command locally: SELECT value FROM citus_local_table_trig (2 rows) ROLLBACK; --- cannot perform remote execution from a trigger on a Citus local table +-- can perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -444,7 +444,8 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 FOR EACH STATEMENT EXECUTE FUNCTION insert_100();') UPDATE another_citus_local_table SET value=value-1;; NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -ERROR: cannot execute a distributed query from a query on a shard +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) ROLLBACK; -- can perform regular execution from a trigger on a Citus local table BEGIN; diff --git a/src/test/regress/expected/distributed_triggers.out b/src/test/regress/expected/distributed_triggers.out index b4ad001f5..2c1d71fe7 100644 --- a/src/test/regress/expected/distributed_triggers.out +++ b/src/test/regress/expected/distributed_triggers.out @@ -272,6 +272,8 @@ FOR EACH ROW EXECUTE FUNCTION distributed_triggers.bad_shardkey_record_change(); -- Query-on-distributed table exception should catch this INSERT INTO data VALUES ('hello6','world6','{"hello6":"world6"}'); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" PL/pgSQL function bad_shardkey_record_change() line XX at SQL statement @@ -345,6 +347,8 @@ SELECT create_distributed_function( BEGIN; SELECT insert_document('hello7', 'world7'); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP" PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement while executing command on localhost:xxxxx @@ -353,6 +357,8 @@ PL/pgSQL function insert_document(text,text) line XX at SQL statement END; SELECT insert_document('hello7', 'world7'); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP" PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" @@ -496,11 +502,15 @@ AFTER INSERT OR UPDATE OR DELETE ON emptest FOR EACH STATEMENT EXECUTE FUNCTION distributed_triggers.record_emp(); INSERT INTO emptest VALUES ('test5', 1); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()" PL/pgSQL function record_emp() line XX at SQL statement while executing command on localhost:xxxxx DELETE FROM emptest; ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()" PL/pgSQL function distributed_triggers.record_emp() line XX at SQL statement while executing command on localhost:xxxxx @@ -538,6 +548,8 @@ FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); TRUNCATE TABLE data_changes; INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id ORDER BY change_id DESC LIMIT 1" @@ -545,6 +557,8 @@ PL/pgSQL function record_change() line XX at SQL statement while executing command on localhost:xxxxx INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id ORDER BY change_id DESC LIMIT 1" @@ -583,6 +597,8 @@ SELECT create_reference_table('data_changes'); INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" PL/pgSQL function record_change() line XX at SQL statement diff --git a/src/test/regress/expected/forcedelegation_functions.out b/src/test/regress/expected/forcedelegation_functions.out index 1bb6b8ba7..103742c5e 100644 --- a/src/test/regress/expected/forcedelegation_functions.out +++ b/src/test/regress/expected/forcedelegation_functions.out @@ -222,19 +222,11 @@ ROLLBACK; BEGIN; -- Query gets delegated to the node of the shard xx_900001 for the key=1, -- and the function inserts value (1+17) locally on the shard xx_900031 +-- which is not allowed because this is not a regular pushdown SELECT insert_data(intcol+17) from test_forcepushdown where intcol = 1; - insert_data ---------------------------------------------------------------------- - -(1 row) - --- This will fail with duplicate error as the function already inserted --- the value(1+17) -SELECT insert_data(18); -DEBUG: pushing down function call in a multi-statement transaction -DEBUG: pushing down the function call -ERROR: duplicate key value violates unique constraint "test_forcepushdown_pkey_900031" -DETAIL: Key (intcol)=(18) already exists. +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (a)" PL/pgSQL function forcepushdown_schema.insert_data(integer) line XX at SQL statement while executing command on localhost:xxxxx @@ -524,19 +516,13 @@ END; -- BEGIN; -- Query lands on the shard with key = 300(shard __900089) and the function inserts locally +-- which is not allowed because this is not a regular pushdown SELECT inner_force_delegation_function(id) FROM test_nested WHERE id = 300; -NOTICE: inner_force_delegation_function():301 -DETAIL: from localhost:xxxxx - inner_force_delegation_function ---------------------------------------------------------------------- - 301 -(1 row) - --- Query lands on the shard with key = 300(shard __900089) and the function inserts remotely -SELECT insert_data_non_distarg(id) FROM test_nested WHERE id = 300; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (a+1)" -PL/pgSQL function forcepushdown_schema.insert_data_non_distarg(integer) line XX at SQL statement +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL statement "SELECT max(id)::numeric+1 FROM forcepushdown_schema.test_nested WHERE id = $1" +PL/pgSQL function forcepushdown_schema.inner_force_delegation_function(integer) line XX at SQL statement while executing command on localhost:xxxxx END; -- @@ -545,6 +531,17 @@ END; -- Param(PARAM_EXEC) node e.g. SELECT fn((SELECT col from test_nested where col=val)) BEGIN; SELECT inner_force_delegation_function((SELECT id+112 FROM test_nested WHERE id=400)); +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL statement "SELECT max(id)::numeric+1 FROM forcepushdown_schema.test_nested WHERE id = $1" +PL/pgSQL function forcepushdown_schema.inner_force_delegation_function(integer) line XX at SQL statement +while executing command on localhost:xxxxx +END; +BEGIN; +SET LOCAL citus.propagate_set_commands TO 'local'; +SET LOCAL citus.allow_nested_distributed_execution TO on; +SELECT inner_force_delegation_function((SELECT id+112 FROM test_nested WHERE id=400)); NOTICE: inner_force_delegation_function():513 DETAIL: from localhost:xxxxx inner_force_delegation_function @@ -697,6 +694,8 @@ SELECT insert_select_data(20); DEBUG: pushing down function call in a multi-statement transaction DEBUG: pushing down the function call ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.test_forcepushdown SELECT(a+1)" PL/pgSQL function forcepushdown_schema.insert_select_data(integer) line XX at SQL statement while executing command on localhost:xxxxx @@ -721,6 +720,8 @@ SELECT insert_select_data(22); DEBUG: pushing down function call in a multi-statement transaction DEBUG: pushing down the function call ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.test_forcepushdown SELECT(a+1)" PL/pgSQL function forcepushdown_schema.insert_select_data(integer) line XX at SQL statement while executing command on localhost:xxxxx @@ -776,6 +777,8 @@ SELECT insert_select_data_nonlocal(41); DEBUG: pushing down function call in a multi-statement transaction DEBUG: pushing down the function call ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.test_forcepushdown(intcol) SELECT intcol FROM forcepushdown_schema.test_forcepushdown_noncolocate" PL/pgSQL function forcepushdown_schema.insert_select_data_nonlocal(integer) line XX at SQL statement @@ -1106,6 +1109,8 @@ SELECT select_data(100); DEBUG: pushing down function call in a multi-statement transaction DEBUG: pushing down the function call ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "SELECT result FROM forcepushdown_schema.test_subquery WHERE data = (SELECT data FROM forcepushdown_schema.test_subquery WHERE data = a)" PL/pgSQL function forcepushdown_schema.select_data(integer) line XX at SQL statement @@ -1223,6 +1228,8 @@ SELECT 1,2,3 FROM select_data(100); DEBUG: pushing down function call in a multi-statement transaction DEBUG: pushing down the function call ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "SELECT result FROM forcepushdown_schema.test_subquery WHERE data = (SELECT data FROM forcepushdown_schema.test_subquery WHERE data = a)" PL/pgSQL function forcepushdown_schema.select_data(integer) line XX at SQL statement diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 5ba1566dc..ffb35b08e 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -1,6 +1,7 @@ -- Test passing off function call to mx workers CREATE SCHEMA multi_mx_function_call_delegation; SET search_path TO multi_mx_function_call_delegation, public; +\set VERBOSITY terse SET citus.shard_replication_factor TO 2; -- This table requires specific settings, create before getting into things create table mx_call_dist_table_replica(id int, val int); @@ -112,8 +113,7 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (1 row) select multi_mx_function_call_delegation.mx_call_copy(2); -ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist -HINT: No function matches the given name and argument types. You might need to add explicit type casts. +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist at character 8 select squares(4); squares --------------------------------------------------------------------- @@ -133,7 +133,6 @@ select mx_call_func(2, 0); -- Mark both functions as distributed ... select create_distributed_function('mx_call_func(int,int)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -141,7 +140,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func_bigint is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -149,7 +147,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func_custom_types is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -157,7 +154,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('mx_call_func_copy(int)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func_copy is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -165,7 +161,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('squares(int)'); NOTICE: procedure multi_mx_function_call_delegation.squares is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -177,11 +172,7 @@ SET client_min_messages TO DEBUG1; select mx_call_func(2, 0); DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -223,7 +214,6 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', colocate_with := 'mx_call_dist_table_bigint'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -234,7 +224,6 @@ select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', ' colocate_with := 'mx_call_dist_table_2', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -299,11 +288,7 @@ begin; select mx_call_func(2, 0); DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -333,11 +318,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -352,11 +333,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -386,11 +363,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::re select mx_call_func(2, 0); DEBUG: cannot push down function call for replicated distributed tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -420,7 +393,6 @@ BEGIN ORDER BY 1, 2; END;$$; DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands -- before distribution ... select mx_call_func_tbl(10); DEBUG: function does not have co-located tables @@ -433,7 +405,6 @@ DEBUG: function does not have co-located tables -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -455,10 +426,8 @@ BEGIN RAISE EXCEPTION 'error'; END;$$; DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -600,6 +569,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); -- worker backend caches inconsistent. Reconnect to coordinator to use -- new worker connections, hence new backends. \c - - - :master_port +\set VERBOSITY terse SET search_path to multi_mx_function_call_delegation, public; SET client_min_messages TO DEBUG1; SET citus.shard_replication_factor = 1; @@ -609,10 +579,8 @@ SET citus.shard_replication_factor = 1; CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -622,11 +590,7 @@ DETAIL: A command for a distributed function is run. To make sure subsequent co select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); DEBUG: arguments in a distributed function must not contain subqueries DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 35 @@ -636,11 +600,7 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment select mx_call_func(floor(random())::int, 2); DEBUG: arguments in a distributed function must be constant expressions DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 27 @@ -649,28 +609,16 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment -- test forms we don't distribute select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- (0 rows) select mx_call_func(2, 0), mx_call_func(0, 2); DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func | mx_call_func --------------------------------------------------------------------- 29 | 27 @@ -732,24 +680,12 @@ DEBUG: pushing down the function call -- that result in remote execution from workers select mx_call_func(id, 0) from mx_call_dist_table_1; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment -while executing command on localhost:xxxxx select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment -while executing command on localhost:xxxxx select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 - SELECT s,s FROM generate_series(100, 110) s" -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement -while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction -CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" -PL/pgSQL function inline_code_block line XX at PERFORM SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; id | val --------------------------------------------------------------------- @@ -816,11 +752,11 @@ DEBUG: pushing down the function call (1 row) \c - - - :worker_1_port +\set VERBOSITY terse SET search_path TO multi_mx_function_call_delegation, public; -- create_distributed_function is disallowed from worker nodes select create_distributed_function('mx_call_func(int,int)'); ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); @@ -835,11 +771,7 @@ BEGIN; SELECT mx_call_func(2, 0); DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -853,18 +785,8 @@ BEGIN END; $$ LANGUAGE plpgsql; DEBUG: not pushing down function calls in a multi-statement transaction -CONTEXT: SQL statement "SELECT mx_call_func(2, 0)" -PL/pgSQL function inline_code_block line XX at PERFORM DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment -SQL statement "SELECT mx_call_func(2, 0)" -PL/pgSQL function inline_code_block line XX at PERFORM DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer -CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment -SQL statement "SELECT mx_call_func(2, 0)" -PL/pgSQL function inline_code_block line XX at PERFORM -- forced calls are delegated in a transaction block BEGIN; SELECT mx_call_func_bigint_force(4, 2); @@ -883,12 +805,9 @@ BEGIN END; $$ LANGUAGE plpgsql; DEBUG: pushing down function call in a multi-statement transaction -CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" -PL/pgSQL function inline_code_block line XX at PERFORM DEBUG: pushing down the function call -CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" -PL/pgSQL function inline_code_block line XX at PERFORM \c - - - :master_port +\set VERBOSITY terse SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 2d317b34e..6706ec6f8 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -1,6 +1,7 @@ -- Test passing off function call to mx workers CREATE SCHEMA multi_mx_function_call_delegation; SET search_path TO multi_mx_function_call_delegation, public; +\set VERBOSITY terse SET citus.shard_replication_factor TO 2; -- This table requires specific settings, create before getting into things create table mx_call_dist_table_replica(id int, val int); @@ -112,8 +113,7 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (1 row) select multi_mx_function_call_delegation.mx_call_copy(2); -ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist -HINT: No function matches the given name and argument types. You might need to add explicit type casts. +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist at character 8 select squares(4); squares --------------------------------------------------------------------- @@ -133,7 +133,6 @@ select mx_call_func(2, 0); -- Mark both functions as distributed ... select create_distributed_function('mx_call_func(int,int)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -141,7 +140,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func_bigint is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -149,7 +147,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func_custom_types is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -157,7 +154,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('mx_call_func_copy(int)'); NOTICE: procedure multi_mx_function_call_delegation.mx_call_func_copy is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -165,7 +161,6 @@ DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] select create_distributed_function('squares(int)'); NOTICE: procedure multi_mx_function_call_delegation.squares is already distributed -DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- @@ -177,11 +172,7 @@ SET client_min_messages TO DEBUG1; select mx_call_func(2, 0); DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -223,7 +214,6 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', colocate_with := 'mx_call_dist_table_bigint'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -234,7 +224,6 @@ select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', ' colocate_with := 'mx_call_dist_table_2', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -299,11 +288,7 @@ begin; select mx_call_func(2, 0); DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -333,11 +318,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -352,11 +333,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -386,11 +363,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::re select mx_call_func(2, 0); DEBUG: cannot push down function call for replicated distributed tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 29 @@ -420,7 +393,6 @@ BEGIN ORDER BY 1, 2; END;$$; DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands -- before distribution ... select mx_call_func_tbl(10); DEBUG: function does not have co-located tables @@ -433,7 +405,6 @@ DEBUG: function does not have co-located tables -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -455,10 +426,8 @@ BEGIN RAISE EXCEPTION 'error'; END;$$; DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -600,6 +569,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); -- worker backend caches inconsistent. Reconnect to coordinator to use -- new worker connections, hence new backends. \c - - - :master_port +\set VERBOSITY terse SET search_path to multi_mx_function_call_delegation, public; SET client_min_messages TO DEBUG1; SET citus.shard_replication_factor = 1; @@ -609,10 +579,8 @@ SET citus.shard_replication_factor = 1; CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -622,11 +590,7 @@ DETAIL: A command for a distributed function is run. To make sure subsequent co select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); DEBUG: arguments in a distributed function must not contain subqueries DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 35 @@ -636,11 +600,7 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment select mx_call_func(floor(random())::int, 2); DEBUG: arguments in a distributed function must be constant expressions DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 27 @@ -649,28 +609,16 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment -- test forms we don't distribute select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- (0 rows) select mx_call_func(2, 0), mx_call_func(0, 2); DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func | mx_call_func --------------------------------------------------------------------- 29 | 27 @@ -732,24 +680,12 @@ DEBUG: pushing down the function call -- that result in remote execution from workers select mx_call_func(id, 0) from mx_call_dist_table_1; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment -while executing command on localhost:xxxxx select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment -while executing command on localhost:xxxxx select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; ERROR: cannot execute a distributed query from a query on a shard -CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 - SELECT s,s FROM generate_series(100, 110) s" -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement -while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction -CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" -PL/pgSQL function inline_code_block line XX at PERFORM SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; id | val --------------------------------------------------------------------- @@ -816,11 +752,11 @@ DEBUG: pushing down the function call (1 row) \c - - - :worker_1_port +\set VERBOSITY terse SET search_path TO multi_mx_function_call_delegation, public; -- create_distributed_function is disallowed from worker nodes select create_distributed_function('mx_call_func(int,int)'); ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); @@ -835,11 +771,7 @@ BEGIN; SELECT mx_call_func(2, 0); DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -853,18 +785,8 @@ BEGIN END; $$ LANGUAGE plpgsql; DEBUG: not pushing down function calls in a multi-statement transaction -CONTEXT: SQL statement "SELECT mx_call_func(2, 0)" -PL/pgSQL function inline_code_block line XX at PERFORM DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment -SQL statement "SELECT mx_call_func(2, 0)" -PL/pgSQL function inline_code_block line XX at PERFORM DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line XX at assignment -SQL statement "SELECT mx_call_func(2, 0)" -PL/pgSQL function inline_code_block line XX at PERFORM -- forced calls are delegated in a transaction block BEGIN; SELECT mx_call_func_bigint_force(4, 2); @@ -883,12 +805,9 @@ BEGIN END; $$ LANGUAGE plpgsql; DEBUG: pushing down function call in a multi-statement transaction -CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" -PL/pgSQL function inline_code_block line XX at PERFORM DEBUG: pushing down the function call -CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" -PL/pgSQL function inline_code_block line XX at PERFORM \c - - - :master_port +\set VERBOSITY terse SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse diff --git a/src/test/regress/expected/nested_execution.out b/src/test/regress/expected/nested_execution.out new file mode 100644 index 000000000..6ac7fe640 --- /dev/null +++ b/src/test/regress/expected/nested_execution.out @@ -0,0 +1,111 @@ +SET search_path TO nested_execution; +SET citus.enable_local_execution TO on; +\set VERBOSITY terse +-- nested execution from queries on distributed tables is generally disallowed +SELECT dist_query_single_shard(key) FROM distributed WHERE key = 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT dist_query_multi_shard() FROM distributed WHERE key = 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT ref_query() FROM distributed WHERE key = 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT dist_query_single_shard(key) FROM distributed LIMIT 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT dist_query_multi_shard() FROM distributed LIMIT 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT ref_query() FROM distributed LIMIT 1; +ERROR: cannot execute a distributed query from a query on a shard +-- nested execution is allowed outside of an aggregate +-- note that this behaviour is different if distributed has only 1 shard +-- however, this test always uses 4 shards +SELECT dist_query_single_shard(count(*)::int) FROM distributed; + dist_query_single_shard +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT dist_query_multi_shard()+count(*) FROM distributed; + ?column? +--------------------------------------------------------------------- + 20 +(1 row) + +SELECT ref_query()+count(*) FROM distributed; + ?column? +--------------------------------------------------------------------- + 20 +(1 row) + +-- nested execution is allowed in a query that only has intermediate results +SELECT dist_query_single_shard(key) FROM (SELECT key FROM distributed LIMIT 1) s; + dist_query_single_shard +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT dist_query_multi_shard() FROM (SELECT key FROM distributed LIMIT 1) s; + dist_query_multi_shard +--------------------------------------------------------------------- + 10 +(1 row) + +SELECT ref_query() FROM (SELECT key FROM distributed LIMIT 1) s; + ref_query +--------------------------------------------------------------------- + 10 +(1 row) + +-- nested execution from queries on reference tables is generally allowed +SELECT dist_query_single_shard(id::int) FROM reference WHERE id = 1; + dist_query_single_shard +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT dist_query_multi_shard() FROM reference WHERE id = 1; + dist_query_multi_shard +--------------------------------------------------------------------- + 10 +(1 row) + +SELECT ref_query() FROM reference WHERE id = 1; + ref_query +--------------------------------------------------------------------- + 10 +(1 row) + +-- repeat checks in insert..select (somewhat different code path) +INSERT INTO distributed SELECT dist_query_single_shard(key) FROM distributed WHERE key = 1; +ERROR: cannot execute a distributed query from a query on a shard +INSERT INTO distributed SELECT dist_query_multi_shard() FROM distributed WHERE key = 1; +ERROR: cannot execute a distributed query from a query on a shard +INSERT INTO distributed SELECT ref_query() FROM distributed WHERE key = 1; +ERROR: cannot execute a distributed query from a query on a shard +INSERT INTO distributed SELECT dist_query_single_shard(key) FROM distributed LIMIT 1; +ERROR: cannot execute a distributed query from a query on a shard +INSERT INTO distributed SELECT dist_query_multi_shard() FROM distributed LIMIT 1; +ERROR: cannot execute a distributed query from a query on a shard +INSERT INTO distributed SELECT ref_query() FROM distributed LIMIT 1; +ERROR: cannot execute a distributed query from a query on a shard +BEGIN; +INSERT INTO distributed SELECT dist_query_single_shard(count(*)::int) FROM distributed; +INSERT INTO distributed SELECT dist_query_multi_shard()+count(*) FROM distributed; +INSERT INTO distributed SELECT ref_query()+count(*) FROM distributed; +ROLLBACK; +BEGIN; +INSERT INTO distributed SELECT dist_query_single_shard(key) FROM (SELECT key FROM distributed LIMIT 1) s; +INSERT INTO distributed SELECT dist_query_multi_shard() FROM (SELECT key FROM distributed LIMIT 1) s; +INSERT INTO distributed SELECT ref_query() FROM (SELECT key FROM distributed LIMIT 1) s; +ROLLBACK; +BEGIN; +INSERT INTO distributed SELECT dist_query_single_shard(id::int) FROM reference WHERE id = 1; +INSERT INTO distributed SELECT dist_query_multi_shard() FROM reference WHERE id = 1; +INSERT INTO distributed SELECT ref_query() FROM reference WHERE id = 1; +ROLLBACK; +-- nested execution without local execution is disallowed (not distinguishable from queries on shard) +SET citus.enable_local_execution TO off; +SELECT dist_query_single_shard(id::int) FROM reference WHERE id = 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT dist_query_multi_shard() FROM reference WHERE id = 1; +ERROR: cannot execute a distributed query from a query on a shard +SELECT ref_query() FROM reference WHERE id = 1; +ERROR: cannot execute a distributed query from a query on a shard diff --git a/src/test/regress/expected/nested_execution_create.out b/src/test/regress/expected/nested_execution_create.out new file mode 100644 index 000000000..889f08872 --- /dev/null +++ b/src/test/regress/expected/nested_execution_create.out @@ -0,0 +1,51 @@ +CREATE SCHEMA nested_execution; +SET search_path TO nested_execution; +-- some of the next_execution tests change for single shard +SET citus.shard_count TO 4; +CREATE TABLE distributed (key int, name text, + created_at timestamptz DEFAULT now()); +CREATE TABLE reference (id bigint PRIMARY KEY, title text); +SELECT create_distributed_table('distributed', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed SELECT i, i::text, now() FROM generate_series(1,10)i; +INSERT INTO reference SELECT i, i::text FROM generate_series(1,10)i; +CREATE FUNCTION dist_query_single_shard(p_key int) +RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + result bigint; +BEGIN + SELECT count(*) INTO result FROM nested_execution.distributed WHERE key = p_key; + RETURN result; +END; +$$; +CREATE FUNCTION dist_query_multi_shard() +RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + result bigint; +BEGIN + SELECT count(*) INTO result FROM nested_execution.distributed; + RETURN result; +END; +$$; +CREATE FUNCTION ref_query() +RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + result bigint; +BEGIN + SELECT count(*) INTO result FROM nested_execution.reference; + RETURN result; +END; +$$; diff --git a/src/test/regress/sql/citus_local_table_triggers.sql b/src/test/regress/sql/citus_local_table_triggers.sql index 95b700a43..ed53fefad 100644 --- a/src/test/regress/sql/citus_local_table_triggers.sql +++ b/src/test/regress/sql/citus_local_table_triggers.sql @@ -292,7 +292,7 @@ BEGIN; SELECT * FROM reference_table; ROLLBACK; --- cannot perform remote execution from a trigger on a Citus local table +-- can perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); diff --git a/src/test/regress/sql/forcedelegation_functions.sql b/src/test/regress/sql/forcedelegation_functions.sql index d95cfc75a..8ec2f9ac8 100644 --- a/src/test/regress/sql/forcedelegation_functions.sql +++ b/src/test/regress/sql/forcedelegation_functions.sql @@ -128,11 +128,8 @@ ROLLBACK; BEGIN; -- Query gets delegated to the node of the shard xx_900001 for the key=1, -- and the function inserts value (1+17) locally on the shard xx_900031 +-- which is not allowed because this is not a regular pushdown SELECT insert_data(intcol+17) from test_forcepushdown where intcol = 1; - --- This will fail with duplicate error as the function already inserted --- the value(1+17) -SELECT insert_data(18); COMMIT; -- @@ -278,11 +275,9 @@ END; BEGIN; -- Query lands on the shard with key = 300(shard __900089) and the function inserts locally +-- which is not allowed because this is not a regular pushdown SELECT inner_force_delegation_function(id) FROM test_nested WHERE id = 300; --- Query lands on the shard with key = 300(shard __900089) and the function inserts remotely -SELECT insert_data_non_distarg(id) FROM test_nested WHERE id = 300; - END; -- @@ -294,6 +289,12 @@ BEGIN; SELECT inner_force_delegation_function((SELECT id+112 FROM test_nested WHERE id=400)); END; +BEGIN; +SET LOCAL citus.propagate_set_commands TO 'local'; +SET LOCAL citus.allow_nested_distributed_execution TO on; +SELECT inner_force_delegation_function((SELECT id+112 FROM test_nested WHERE id=400)); +END; + CREATE OR REPLACE FUNCTION test_non_constant(x int, y bigint) RETURNS int AS $$ diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index 0efcea922..e4eeaebc2 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -2,6 +2,7 @@ CREATE SCHEMA multi_mx_function_call_delegation; SET search_path TO multi_mx_function_call_delegation, public; +\set VERBOSITY terse SET citus.shard_replication_factor TO 2; @@ -256,6 +257,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); -- worker backend caches inconsistent. Reconnect to coordinator to use -- new worker connections, hence new backends. \c - - - :master_port +\set VERBOSITY terse SET search_path to multi_mx_function_call_delegation, public; SET client_min_messages TO DEBUG1; SET citus.shard_replication_factor = 1; @@ -310,6 +312,7 @@ EXECUTE call_plan(2, 0); EXECUTE call_plan(2, 0); \c - - - :worker_1_port +\set VERBOSITY terse SET search_path TO multi_mx_function_call_delegation, public; -- create_distributed_function is disallowed from worker nodes select create_distributed_function('mx_call_func(int,int)'); @@ -343,6 +346,7 @@ END; $$ LANGUAGE plpgsql; \c - - - :master_port +\set VERBOSITY terse SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; diff --git a/src/test/regress/sql/nested_execution.sql b/src/test/regress/sql/nested_execution.sql new file mode 100644 index 000000000..4d0d9336d --- /dev/null +++ b/src/test/regress/sql/nested_execution.sql @@ -0,0 +1,63 @@ +SET search_path TO nested_execution; +SET citus.enable_local_execution TO on; +\set VERBOSITY terse + +-- nested execution from queries on distributed tables is generally disallowed +SELECT dist_query_single_shard(key) FROM distributed WHERE key = 1; +SELECT dist_query_multi_shard() FROM distributed WHERE key = 1; +SELECT ref_query() FROM distributed WHERE key = 1; + +SELECT dist_query_single_shard(key) FROM distributed LIMIT 1; +SELECT dist_query_multi_shard() FROM distributed LIMIT 1; +SELECT ref_query() FROM distributed LIMIT 1; + +-- nested execution is allowed outside of an aggregate +-- note that this behaviour is different if distributed has only 1 shard +-- however, this test always uses 4 shards +SELECT dist_query_single_shard(count(*)::int) FROM distributed; +SELECT dist_query_multi_shard()+count(*) FROM distributed; +SELECT ref_query()+count(*) FROM distributed; + +-- nested execution is allowed in a query that only has intermediate results +SELECT dist_query_single_shard(key) FROM (SELECT key FROM distributed LIMIT 1) s; +SELECT dist_query_multi_shard() FROM (SELECT key FROM distributed LIMIT 1) s; +SELECT ref_query() FROM (SELECT key FROM distributed LIMIT 1) s; + +-- nested execution from queries on reference tables is generally allowed +SELECT dist_query_single_shard(id::int) FROM reference WHERE id = 1; +SELECT dist_query_multi_shard() FROM reference WHERE id = 1; +SELECT ref_query() FROM reference WHERE id = 1; + +-- repeat checks in insert..select (somewhat different code path) +INSERT INTO distributed SELECT dist_query_single_shard(key) FROM distributed WHERE key = 1; +INSERT INTO distributed SELECT dist_query_multi_shard() FROM distributed WHERE key = 1; +INSERT INTO distributed SELECT ref_query() FROM distributed WHERE key = 1; + +INSERT INTO distributed SELECT dist_query_single_shard(key) FROM distributed LIMIT 1; +INSERT INTO distributed SELECT dist_query_multi_shard() FROM distributed LIMIT 1; +INSERT INTO distributed SELECT ref_query() FROM distributed LIMIT 1; + +BEGIN; +INSERT INTO distributed SELECT dist_query_single_shard(count(*)::int) FROM distributed; +INSERT INTO distributed SELECT dist_query_multi_shard()+count(*) FROM distributed; +INSERT INTO distributed SELECT ref_query()+count(*) FROM distributed; +ROLLBACK; + +BEGIN; +INSERT INTO distributed SELECT dist_query_single_shard(key) FROM (SELECT key FROM distributed LIMIT 1) s; +INSERT INTO distributed SELECT dist_query_multi_shard() FROM (SELECT key FROM distributed LIMIT 1) s; +INSERT INTO distributed SELECT ref_query() FROM (SELECT key FROM distributed LIMIT 1) s; +ROLLBACK; + +BEGIN; +INSERT INTO distributed SELECT dist_query_single_shard(id::int) FROM reference WHERE id = 1; +INSERT INTO distributed SELECT dist_query_multi_shard() FROM reference WHERE id = 1; +INSERT INTO distributed SELECT ref_query() FROM reference WHERE id = 1; +ROLLBACK; + +-- nested execution without local execution is disallowed (not distinguishable from queries on shard) +SET citus.enable_local_execution TO off; + +SELECT dist_query_single_shard(id::int) FROM reference WHERE id = 1; +SELECT dist_query_multi_shard() FROM reference WHERE id = 1; +SELECT ref_query() FROM reference WHERE id = 1; diff --git a/src/test/regress/sql/nested_execution_create.sql b/src/test/regress/sql/nested_execution_create.sql new file mode 100644 index 000000000..1c7cfa638 --- /dev/null +++ b/src/test/regress/sql/nested_execution_create.sql @@ -0,0 +1,48 @@ +CREATE SCHEMA nested_execution; +SET search_path TO nested_execution; + +-- some of the next_execution tests change for single shard +SET citus.shard_count TO 4; + +CREATE TABLE distributed (key int, name text, + created_at timestamptz DEFAULT now()); +CREATE TABLE reference (id bigint PRIMARY KEY, title text); + +SELECT create_distributed_table('distributed', 'key'); +SELECT create_reference_table('reference'); + +INSERT INTO distributed SELECT i, i::text, now() FROM generate_series(1,10)i; +INSERT INTO reference SELECT i, i::text FROM generate_series(1,10)i; + +CREATE FUNCTION dist_query_single_shard(p_key int) +RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + result bigint; +BEGIN + SELECT count(*) INTO result FROM nested_execution.distributed WHERE key = p_key; + RETURN result; +END; +$$; + +CREATE FUNCTION dist_query_multi_shard() +RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + result bigint; +BEGIN + SELECT count(*) INTO result FROM nested_execution.distributed; + RETURN result; +END; +$$; + +CREATE FUNCTION ref_query() +RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + result bigint; +BEGIN + SELECT count(*) INTO result FROM nested_execution.reference; + RETURN result; +END; +$$; diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index 4a1543814..8e26348a6 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -4,6 +4,6 @@ test: ch_benchmarks_1 ch_benchmarks_2 ch_benchmarks_3 test: ch_benchmarks_4 ch_benchmarks_5 ch_benchmarks_6 test: intermediate_result_pruning_queries_1 intermediate_result_pruning_queries_2 test: dropped_columns_1 distributed_planning -test: local_dist_join +test: local_dist_join nested_execution test: connectivity_checks citus_run_command test: sequences