mirror of https://github.com/citusdata/citus.git
Allows to execute some distributed stmt in functions
parent
034a86e2d4
commit
9c630f120f
|
@ -3475,7 +3475,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
||||||
ereport(ERROR, (errmsg("could not connect to any active placements")));
|
ereport(ERROR, (errmsg("could not connect to any active placements")));
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureTaskExecutionAllowed(hasRemoteCopy);
|
EnsureTaskExecutionAllowed(hasRemoteCopy, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We just error out and code execution should never reach to this
|
* We just error out and code execution should never reach to this
|
||||||
|
|
|
@ -1385,7 +1385,7 @@ StartDistributedExecution(DistributedExecution *execution)
|
||||||
if (execution->remoteTaskList != NIL)
|
if (execution->remoteTaskList != NIL)
|
||||||
{
|
{
|
||||||
bool isRemote = true;
|
bool isRemote = true;
|
||||||
EnsureTaskExecutionAllowed(isRemote);
|
EnsureTaskExecutionAllowed(isRemote, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,7 +230,24 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
if (taskList != NIL)
|
if (taskList != NIL)
|
||||||
{
|
{
|
||||||
bool isRemote = false;
|
bool isRemote = false;
|
||||||
EnsureTaskExecutionAllowed(isRemote);
|
if (!EnsureTaskExecutionAllowed(isRemote, false))
|
||||||
|
{
|
||||||
|
/* instead of erroring, let's check further */
|
||||||
|
Task *task = NULL;
|
||||||
|
foreach_ptr(task, taskList)
|
||||||
|
{
|
||||||
|
if (!task->safeToPush)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errmsg("cannot execute a distributed query from a query on a "
|
||||||
|
"shard"),
|
||||||
|
errdetail("Executing a distributed query in a function call that "
|
||||||
|
"may be pushed to a remote node can lead to incorrect "
|
||||||
|
"results."),
|
||||||
|
errhint("Avoid nesting of distributed queries or use alter user "
|
||||||
|
"current_user set citus.allow_nested_distributed_execution "
|
||||||
|
"to on to allow it with possible incorrectness.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -899,15 +899,17 @@ ExecutorBoundParams(void)
|
||||||
* a function in a query that gets pushed down to the worker, and the
|
* a function in a query that gets pushed down to the worker, and the
|
||||||
* function performs a query on a distributed table.
|
* function performs a query on a distributed table.
|
||||||
*/
|
*/
|
||||||
void
|
bool
|
||||||
EnsureTaskExecutionAllowed(bool isRemote)
|
EnsureTaskExecutionAllowed(bool isRemote, bool shouldError)
|
||||||
{
|
{
|
||||||
if (IsTaskExecutionAllowed(isRemote))
|
if (IsTaskExecutionAllowed(isRemote))
|
||||||
{
|
{
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a "
|
if (shouldError)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errmsg("cannot execute a distributed query from a query on a "
|
||||||
"shard"),
|
"shard"),
|
||||||
errdetail("Executing a distributed query in a function call that "
|
errdetail("Executing a distributed query in a function call that "
|
||||||
"may be pushed to a remote node can lead to incorrect "
|
"may be pushed to a remote node can lead to incorrect "
|
||||||
|
@ -915,6 +917,8 @@ EnsureTaskExecutionAllowed(bool isRemote)
|
||||||
errhint("Avoid nesting of distributed queries or use alter user "
|
errhint("Avoid nesting of distributed queries or use alter user "
|
||||||
"current_user set citus.allow_nested_distributed_execution "
|
"current_user set citus.allow_nested_distributed_execution "
|
||||||
"to on to allow it with possible incorrectness.")));
|
"to on to allow it with possible incorrectness.")));
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1793,6 +1793,8 @@ CreateTask(TaskType taskType)
|
||||||
task->partiallyLocalOrRemote = false;
|
task->partiallyLocalOrRemote = false;
|
||||||
task->relationShardList = NIL;
|
task->relationShardList = NIL;
|
||||||
|
|
||||||
|
task->safeToPush = false;
|
||||||
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2177,6 +2179,58 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
task->replicationModel = replicationModel;
|
task->replicationModel = replicationModel;
|
||||||
task->parametersInQueryStringResolved = parametersInQueryResolved;
|
task->parametersInQueryStringResolved = parametersInQueryResolved;
|
||||||
|
|
||||||
|
StringInfo sqlQueryString = makeStringInfo();
|
||||||
|
pg_get_query_def(query, sqlQueryString);
|
||||||
|
/* log the query string we generated */
|
||||||
|
ereport(DEBUG4, (errmsg("generated sql query for task %d", task->taskId),
|
||||||
|
errdetail("query string: \"%s\"",
|
||||||
|
sqlQueryString->data)));
|
||||||
|
|
||||||
|
// if (query->hasTargetSRFs)
|
||||||
|
// if (query->rtable)
|
||||||
|
// if (query->jointree)
|
||||||
|
// if (query->targetList)
|
||||||
|
// if (query->returningList)
|
||||||
|
/* Check the target list */
|
||||||
|
// task->safeToPush = true;
|
||||||
|
// ListCell *lc;
|
||||||
|
// bool foundUDF = false;
|
||||||
|
// foreach (lc, query->targetList)
|
||||||
|
// {
|
||||||
|
// TargetEntry *tle = (TargetEntry *) lfirst(lc);
|
||||||
|
// elog(DEBUG2, "walking target list");
|
||||||
|
// if (ContainsUDFWalker((Node *) tle->expr, &foundUDF))
|
||||||
|
// {
|
||||||
|
// task->safeToPush = false;
|
||||||
|
// elog(DEBUG2, "UNSAFE");
|
||||||
|
// // break;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
/* quick check first */
|
||||||
|
// if (colocationId) //FIXME include header for INVALID_COLOCATION_ID ?
|
||||||
|
// {
|
||||||
|
// goto exitnow;
|
||||||
|
// }
|
||||||
|
|
||||||
|
ListCell *lc;
|
||||||
|
bool foundNonReferenceTable = false;
|
||||||
|
foreach (lc, relationShardList)
|
||||||
|
{
|
||||||
|
RelationShard *relationShard = (RelationShard *) lfirst(lc);
|
||||||
|
// elog(DEBUG2, "walking relation shard list");
|
||||||
|
if (!IsCitusTableType(relationShard->relationId, REFERENCE_TABLE))
|
||||||
|
{
|
||||||
|
foundNonReferenceTable = true;
|
||||||
|
// elog(DEBUG2, "found UNSAFE");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!foundNonReferenceTable)
|
||||||
|
task->safeToPush = true;
|
||||||
|
|
||||||
|
exitnow:
|
||||||
|
|
||||||
return list_make1(task);
|
return list_make1(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -327,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
||||||
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
|
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
|
||||||
COPY_SCALAR_FIELD(isLocalTableModification);
|
COPY_SCALAR_FIELD(isLocalTableModification);
|
||||||
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction);
|
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction);
|
||||||
|
COPY_SCALAR_FIELD(safeToPush);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -536,6 +536,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
||||||
WRITE_BOOL_FIELD(isLocalTableModification);
|
WRITE_BOOL_FIELD(isLocalTableModification);
|
||||||
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
|
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
|
||||||
|
WRITE_BOOL_FIELD(safeToPush);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -153,7 +153,7 @@ extern void EnsureSequentialMode(ObjectType objType);
|
||||||
extern void SetLocalForceMaxQueryParallelization(void);
|
extern void SetLocalForceMaxQueryParallelization(void);
|
||||||
extern void SortTupleStore(CitusScanState *scanState);
|
extern void SortTupleStore(CitusScanState *scanState);
|
||||||
extern ParamListInfo ExecutorBoundParams(void);
|
extern ParamListInfo ExecutorBoundParams(void);
|
||||||
extern void EnsureTaskExecutionAllowed(bool isRemote);
|
extern bool EnsureTaskExecutionAllowed(bool isRemote, bool shouldError);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_EXECUTOR_H */
|
#endif /* MULTI_EXECUTOR_H */
|
||||||
|
|
|
@ -334,6 +334,9 @@ typedef struct Task
|
||||||
|
|
||||||
Const *partitionKeyValue;
|
Const *partitionKeyValue;
|
||||||
int colocationId;
|
int colocationId;
|
||||||
|
|
||||||
|
/* if it's granted this task nested statements are safe to be executed */
|
||||||
|
bool safeToPush;
|
||||||
} Task;
|
} Task;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue