mirror of https://github.com/citusdata/citus.git
Merge pull request #3553 from citusdata/refactor/begin_scan
Refactor CitusBeginScan into separate SELECT / DML pathspull/3564/head
commit
241c186603
|
@ -3185,7 +3185,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
session->currentTask = placementExecution;
|
session->currentTask = placementExecution;
|
||||||
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
|
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
|
||||||
|
|
||||||
if (paramListInfo != NULL)
|
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
|
||||||
{
|
{
|
||||||
int parameterCount = paramListInfo->numParams;
|
int parameterCount = paramListInfo->numParams;
|
||||||
Oid *parameterTypes = NULL;
|
Oid *parameterTypes = NULL;
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "distributed/citus_clauses.h"
|
#include "distributed/citus_clauses.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/distributed_execution_locks.h"
|
#include "distributed/distributed_execution_locks.h"
|
||||||
#include "distributed/insert_select_executor.h"
|
#include "distributed/insert_select_executor.h"
|
||||||
|
@ -48,17 +49,17 @@ static Node * DelayedErrorCreateScan(CustomScan *scan);
|
||||||
|
|
||||||
/* functions that are common to different scans */
|
/* functions that are common to different scans */
|
||||||
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
|
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate,
|
static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
int eflags);
|
static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
static void CitusPreExecScan(CitusScanState *scanState);
|
static void CitusPreExecScan(CitusScanState *scanState);
|
||||||
static void HandleDeferredShardPruningForFastPathQueries(
|
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
|
||||||
DistributedPlan *distributedPlan);
|
static void RegenerateTaskListForInsert(Job *workerJob);
|
||||||
static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan);
|
static void CacheLocalPlanForShardQuery(Task *task,
|
||||||
static void CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan);
|
DistributedPlan *originalDistributedPlan);
|
||||||
static DistributedPlan * CopyDistributedPlanWithoutCache(CitusScanState *scanState);
|
static bool IsLocalPlanCachingSupported(Job *workerJob,
|
||||||
static void ResetExecutionParameters(EState *executorState);
|
DistributedPlan *originalDistributedPlan);
|
||||||
static void CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node,
|
static DistributedPlan * CopyDistributedPlanWithoutCache(
|
||||||
EState *estate, int eflags);
|
DistributedPlan *originalDistributedPlan);
|
||||||
static void CitusEndScan(CustomScanState *node);
|
static void CitusEndScan(CustomScanState *node);
|
||||||
static void CitusReScan(CustomScanState *node);
|
static void CitusReScan(CustomScanState *node);
|
||||||
|
|
||||||
|
@ -187,16 +188,22 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
if (distributedPlan->insertSelectQuery != NULL)
|
||||||
if (workerJob &&
|
|
||||||
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning))
|
|
||||||
{
|
{
|
||||||
CitusBeginScanWithCoordinatorProcessing(node, estate, eflags);
|
/*
|
||||||
|
* INSERT..SELECT via coordinator or re-partitioning are special because
|
||||||
|
* the SELECT part is planned separately.
|
||||||
|
*/
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
else if (distributedPlan->modLevel == ROW_MODIFY_READONLY)
|
||||||
CitusBeginScanWithoutCoordinatorProcessing(node, estate, eflags);
|
{
|
||||||
|
CitusBeginSelectScan(node, estate, eflags);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CitusBeginModifyScan(node, estate, eflags);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -233,179 +240,176 @@ CitusExecScan(CustomScanState *node)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusBeginScanWithoutCoordinatorProcessing is intended to work on all executions
|
* CitusBeginSelectScan handles deferred pruning and plan caching for SELECTs.
|
||||||
* that do not require any coordinator processing. The function simply acquires the
|
|
||||||
* necessary locks on the shards involved in the task list of the distributed plan
|
|
||||||
* and does the placement assignements. This implies that the function is a no-op for
|
|
||||||
* SELECT queries as they do not require any locking and placement assignements.
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, EState *estate, int
|
CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
eflags)
|
|
||||||
{
|
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
|
||||||
|
|
||||||
if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
|
|
||||||
distributedPlan->insertSelectQuery != NULL)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* we'll be modifying the distributed plan by assigning taskList, do it on a copy */
|
|
||||||
distributedPlan = copyObject(distributedPlan);
|
|
||||||
scanState->distributedPlan = distributedPlan;
|
|
||||||
|
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
|
||||||
List *taskList = workerJob->taskList;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* These more complex jobs should have been evaluated in
|
|
||||||
* CitusBeginScanWithCoordinatorProcessing.
|
|
||||||
*/
|
|
||||||
Assert(!(workerJob->requiresMasterEvaluation || workerJob->deferredPruning));
|
|
||||||
|
|
||||||
/* prevent concurrent placement changes */
|
|
||||||
AcquireMetadataLocks(taskList);
|
|
||||||
|
|
||||||
/* modify tasks are always assigned using first-replica policy */
|
|
||||||
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusBeginScanWithCoordinatorProcessing generates query strings at the start of the execution
|
|
||||||
* in two cases: when the query requires master evaluation and/or deferred shard pruning.
|
|
||||||
*
|
|
||||||
* The function is also smart about caching plans if the plan is local to this node.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, int eflags)
|
|
||||||
{
|
{
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
|
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
|
||||||
DistributedPlan *distributedPlan = CopyDistributedPlanWithoutCache(scanState);
|
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
if (!originalDistributedPlan->workerJob->deferredPruning)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For SELECT queries that have already been pruned we can proceed straight
|
||||||
|
* to execution, since none of the prepared statement logic applies.
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Create a copy of the generic plan for the current execution, but make a shallow
|
||||||
|
* copy of the plan cache. That means we'll be able to access the plan cache via
|
||||||
|
* currentPlan->workerJob->localPlannedStatements, but it will be preserved across
|
||||||
|
* executions by the prepared statement logic.
|
||||||
|
*/
|
||||||
|
DistributedPlan *currentPlan =
|
||||||
|
CopyDistributedPlanWithoutCache(originalDistributedPlan);
|
||||||
|
scanState->distributedPlan = currentPlan;
|
||||||
|
|
||||||
|
Job *workerJob = currentPlan->workerJob;
|
||||||
Query *jobQuery = workerJob->jobQuery;
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
|
|
||||||
/* we'd only get to this function with the following conditions */
|
|
||||||
Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
|
|
||||||
|
|
||||||
PlanState *planState = &(scanState->customScanState.ss.ps);
|
PlanState *planState = &(scanState->customScanState.ss.ps);
|
||||||
|
|
||||||
/* citus only evaluates functions for modification queries */
|
|
||||||
bool modifyQueryRequiresMasterEvaluation =
|
|
||||||
jobQuery->commandType != CMD_SELECT &&
|
|
||||||
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecuteMasterEvaluableFunctions handles both function evalation
|
* We only do deferred pruning for fast path queries, which have a single
|
||||||
* and parameter evaluation. Pruning is most likely deferred because
|
* partition column value.
|
||||||
* there is a parameter on the distribution key. So, evaluate in both
|
|
||||||
* cases.
|
|
||||||
*/
|
*/
|
||||||
if (modifyQueryRequiresMasterEvaluation)
|
Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
|
||||||
{
|
|
||||||
/* evaluate functions and parameters for modification queries */
|
|
||||||
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
|
|
||||||
}
|
|
||||||
else if (jobQuery->commandType == CMD_SELECT && !workerJob->deferredPruning)
|
|
||||||
{
|
|
||||||
/* we'll use generated strings, no need to have the parameters anymore */
|
|
||||||
EState *executorState = planState->state;
|
|
||||||
ResetExecutionParameters(executorState);
|
|
||||||
|
|
||||||
/* we're done, we don't want to evaluate functions for SELECT queries */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (jobQuery->commandType == CMD_SELECT && workerJob->deferredPruning)
|
|
||||||
{
|
|
||||||
/*
|
/*
|
||||||
* Evaluate parameters, because the parameters are only avaliable on the
|
* Evaluate parameters, because the parameters are only available on the
|
||||||
* coordinator and are required for pruning.
|
* coordinator and are required for pruning.
|
||||||
*
|
*
|
||||||
* But, we don't want to evaluate functions for read-only queries on the
|
* We don't evaluate functions for read-only queries on the coordinator
|
||||||
* coordinator as the volatile functions could yield different
|
* at the moment. Most function calls would be in a context where they
|
||||||
* results per shard (also per row) and could have side-effects.
|
* should be re-evaluated for every row in case of volatile functions.
|
||||||
*
|
*
|
||||||
* Note that Citus already errors out for modification queries during
|
* TODO: evaluate stable functions
|
||||||
* planning when the query involve any volatile function that might
|
|
||||||
* diverge the shards as such functions are expected to yield different
|
|
||||||
* results per shard (also per row).
|
|
||||||
*/
|
*/
|
||||||
ExecuteMasterEvaluableParameters(jobQuery, planState);
|
ExecuteMasterEvaluableParameters(jobQuery, planState);
|
||||||
}
|
|
||||||
|
/* job query no longer has parameters, so we should not send any */
|
||||||
|
workerJob->parametersInJobQueryResolved = true;
|
||||||
|
|
||||||
|
/* parameters are filled in, so we can generate a task for this execution */
|
||||||
|
RegenerateTaskForFasthPathQuery(workerJob);
|
||||||
|
|
||||||
|
if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
|
||||||
|
{
|
||||||
|
Task *task = linitial(workerJob->taskList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* After evaluating the function/parameters, we're done unless shard pruning
|
* We are going to execute this task locally. If it's not already in
|
||||||
* is also deferred.
|
* the cache, create a local plan now and add it to the cache. During
|
||||||
|
* execution, we will get the plan from the cache.
|
||||||
|
*
|
||||||
|
* The plan will be cached across executions when originalDistributedPlan
|
||||||
|
* represents a prepared statement.
|
||||||
*/
|
*/
|
||||||
if (workerJob->requiresMasterEvaluation && !workerJob->deferredPruning)
|
CacheLocalPlanForShardQuery(task, originalDistributedPlan);
|
||||||
{
|
}
|
||||||
RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList);
|
|
||||||
|
|
||||||
/* we'll use generated strings, no need to have the parameters anymore */
|
|
||||||
EState *executorState = planState->state;
|
|
||||||
ResetExecutionParameters(executorState);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusBeginModifyScan prepares the scan state for a modification.
|
||||||
|
*
|
||||||
|
* Modifications are special because:
|
||||||
|
* a) we evaluate function calls (e.g. nextval) here and the outcome may
|
||||||
|
* determine which shards are affected by this query.
|
||||||
|
* b) we need to take metadata locks to make sure no write is left behind
|
||||||
|
* when finalizing a shard move.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
|
{
|
||||||
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
|
PlanState *planState = &(scanState->customScanState.ss.ps);
|
||||||
|
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
|
||||||
|
|
||||||
|
DistributedPlan *currentPlan =
|
||||||
|
CopyDistributedPlanWithoutCache(originalDistributedPlan);
|
||||||
|
scanState->distributedPlan = currentPlan;
|
||||||
|
|
||||||
|
Job *workerJob = currentPlan->workerJob;
|
||||||
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
|
bool evaluateAllExpressions = workerJob->requiresMasterEvaluation ||
|
||||||
|
workerJob->deferredPruning;
|
||||||
|
|
||||||
|
if (evaluateAllExpressions)
|
||||||
|
{
|
||||||
|
/* evaluate both functions and parameters */
|
||||||
|
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
|
||||||
|
|
||||||
|
/* job query no longer has parameters, so we should not send any */
|
||||||
|
workerJob->parametersInJobQueryResolved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerJob->deferredPruning)
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* At this point, we're about to do the shard pruning for fast-path queries.
|
* At this point, we're about to do the shard pruning for fast-path queries.
|
||||||
* Given that pruning is deferred always for INSERTs, we get here
|
* Given that pruning is deferred always for INSERTs, we get here
|
||||||
* !EnableFastPathRouterPlanner as well.
|
* !EnableFastPathRouterPlanner as well.
|
||||||
*/
|
*/
|
||||||
Assert(workerJob->deferredPruning &&
|
Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
|
||||||
(distributedPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner));
|
|
||||||
|
/*
|
||||||
|
* We can only now decide which shard to use, so we need to build a new task
|
||||||
|
* list.
|
||||||
|
*/
|
||||||
if (jobQuery->commandType == CMD_INSERT)
|
if (jobQuery->commandType == CMD_INSERT)
|
||||||
{
|
{
|
||||||
HandleDeferredShardPruningForInserts(distributedPlan);
|
RegenerateTaskListForInsert(workerJob);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
HandleDeferredShardPruningForFastPathQueries(distributedPlan);
|
RegenerateTaskForFasthPathQuery(workerJob);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (workerJob->requiresMasterEvaluation)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When there is no deferred pruning, but we did evaluate functions, then
|
||||||
|
* we only rebuild the query strings in the existing tasks.
|
||||||
|
*/
|
||||||
|
RebuildQueryStrings(workerJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (jobQuery->commandType != CMD_SELECT)
|
/*
|
||||||
{
|
* Now that we know the shard ID(s) we can acquire the necessary shard metadata
|
||||||
|
* locks. Once we have the locks it's safe to load the placement metadata.
|
||||||
|
*/
|
||||||
|
|
||||||
/* prevent concurrent placement changes */
|
/* prevent concurrent placement changes */
|
||||||
AcquireMetadataLocks(workerJob->taskList);
|
AcquireMetadataLocks(workerJob->taskList);
|
||||||
|
|
||||||
/* modify tasks are always assigned using first-replica policy */
|
/* modify tasks are always assigned using first-replica policy */
|
||||||
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
|
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
|
||||||
}
|
|
||||||
|
|
||||||
if (list_length(distributedPlan->workerJob->taskList) != 1)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We might have zero shard queries or multi-row INSERTs at this point,
|
|
||||||
* we only want to cache single task queries.
|
|
||||||
*/
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* As long as the task accesses local node and the query doesn't have
|
* Now that we have populated the task placements we can determine whether
|
||||||
* any volatile functions, we cache the local Postgres plan on the
|
* any of them are local to this node and cache a plan if needed.
|
||||||
* shard for re-use.
|
|
||||||
*/
|
*/
|
||||||
Task *task = linitial(distributedPlan->workerJob->taskList);
|
if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
|
||||||
if (EnableLocalExecution && TaskAccessesLocalNode(task) &&
|
|
||||||
!contain_volatile_functions(
|
|
||||||
(Node *) originalDistributedPlan->workerJob->jobQuery))
|
|
||||||
{
|
|
||||||
CacheLocalPlanForTask(task, originalDistributedPlan);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
|
Task *task = linitial(workerJob->taskList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we're not going to use a cached plan, we'll use the query string that is
|
* We are going to execute this task locally. If it's not already in
|
||||||
* already generated where the parameters are replaced, so we should not have
|
* the cache, create a local plan now and add it to the cache. During
|
||||||
* the parameters anymore.
|
* execution, we will get the plan from the cache.
|
||||||
|
*
|
||||||
|
* WARNING: In this function we'll use the original plan with the original
|
||||||
|
* query tree, meaning parameters and function calls are back and we'll
|
||||||
|
* redo evaluation in the local (Postgres) executor. The reason we do this
|
||||||
|
* is that we only need to cache one generic plan per shard.
|
||||||
|
*
|
||||||
|
* The plan will be cached across executions when originalDistributedPlan
|
||||||
|
* represents a prepared statement.
|
||||||
*/
|
*/
|
||||||
EState *executorState = planState->state;
|
CacheLocalPlanForShardQuery(task, originalDistributedPlan);
|
||||||
ResetExecutionParameters(executorState);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,15 +426,13 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i
|
||||||
* reasons, as they are immutable, so no need to have a deep copy.
|
* reasons, as they are immutable, so no need to have a deep copy.
|
||||||
*/
|
*/
|
||||||
static DistributedPlan *
|
static DistributedPlan *
|
||||||
CopyDistributedPlanWithoutCache(CitusScanState *scanState)
|
CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan)
|
||||||
{
|
{
|
||||||
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
|
|
||||||
List *localPlannedStatements =
|
List *localPlannedStatements =
|
||||||
originalDistributedPlan->workerJob->localPlannedStatements;
|
originalDistributedPlan->workerJob->localPlannedStatements;
|
||||||
originalDistributedPlan->workerJob->localPlannedStatements = NIL;
|
originalDistributedPlan->workerJob->localPlannedStatements = NIL;
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
|
DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
|
||||||
scanState->distributedPlan = distributedPlan;
|
|
||||||
|
|
||||||
/* set back the immutable field */
|
/* set back the immutable field */
|
||||||
originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
|
originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
|
||||||
|
@ -441,30 +443,12 @@ CopyDistributedPlanWithoutCache(CitusScanState *scanState)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ResetExecutionParameters set the parameter list to NULL. See the function
|
* CacheLocalPlanForShardQuery replaces the relation OIDs in the job query
|
||||||
* for details.
|
* with shard relation OIDs and then plans the query and caches the result
|
||||||
|
* in the originalDistributedPlan (which may be preserved across executions).
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ResetExecutionParameters(EState *executorState)
|
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan)
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We've processed parameters in ExecuteMasterEvaluableFunctions and
|
|
||||||
* don't need to send their values to workers, since they will be
|
|
||||||
* represented as constants in the deparsed query. To avoid sending
|
|
||||||
* parameter values, we set the parameter list to NULL.
|
|
||||||
*/
|
|
||||||
executorState->es_param_list_info = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CacheLocalPlanForTask caches a plan that is local to this node in the
|
|
||||||
* originalDistributedPlan.
|
|
||||||
*
|
|
||||||
* The basic idea is to be able to skip planning on the shards when possible.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan)
|
|
||||||
{
|
{
|
||||||
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
|
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
|
||||||
if (localPlan != NULL)
|
if (localPlan != NULL)
|
||||||
|
@ -512,6 +496,15 @@ CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsLoggableLevel(DEBUG5))
|
||||||
|
{
|
||||||
|
StringInfo queryString = makeStringInfo();
|
||||||
|
pg_get_query_def(shardQuery, queryString);
|
||||||
|
|
||||||
|
ereport(DEBUG5, (errmsg("caching plan for query: %s",
|
||||||
|
queryString->data)));
|
||||||
|
}
|
||||||
|
|
||||||
LockRelationOid(rangeTableEntry->relid, lockMode);
|
LockRelationOid(rangeTableEntry->relid, lockMode);
|
||||||
|
|
||||||
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
|
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
|
||||||
|
@ -554,18 +547,86 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HandleDeferredShardPruningForInserts does the shard pruning for INSERT
|
* IsLocalPlanCachingSupported returns whether (part of) the task can be planned
|
||||||
|
* and executed locally and whether caching is supported (single shard, no volatile
|
||||||
|
* functions).
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan)
|
||||||
|
{
|
||||||
|
if (!currentJob->deferredPruning)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When not using deferred pruning we may have already replaced distributed
|
||||||
|
* table RTEs with citus_extradata_container RTEs to pass the shard ID to the
|
||||||
|
* deparser. In that case, we cannot pass the query tree directly to the
|
||||||
|
* planner.
|
||||||
|
*
|
||||||
|
* If desired, we can relax this check by improving the implementation of
|
||||||
|
* CacheLocalPlanForShardQuery to translate citus_extradata_container
|
||||||
|
* to a shard relation OID.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *taskList = currentJob->taskList;
|
||||||
|
if (list_length(taskList) != 1)
|
||||||
|
{
|
||||||
|
/* we only support plan caching for single shard queries */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Task *task = linitial(taskList);
|
||||||
|
if (!TaskAccessesLocalNode(task))
|
||||||
|
{
|
||||||
|
/* not a local task */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!EnableLocalExecution)
|
||||||
|
{
|
||||||
|
/* user requested not to use local execution */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TransactionConnectedToLocalGroup)
|
||||||
|
{
|
||||||
|
/* transaction already connected to localhost */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Query *originalJobQuery = originalDistributedPlan->workerJob->jobQuery;
|
||||||
|
if (contain_volatile_functions((Node *) originalJobQuery))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We do not cache plans with volatile functions in the query.
|
||||||
|
*
|
||||||
|
* The reason we care about volatile functions is primarily that we
|
||||||
|
* already executed them in ExecuteMasterEvaluableFunctionsAndParameters
|
||||||
|
* and since we're falling back to the original query tree here we would
|
||||||
|
* execute them again if we execute the plan.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RegenerateTaskListForInsert does the shard pruning for an INSERT query
|
||||||
* queries and rebuilds the query strings.
|
* queries and rebuilds the query strings.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan)
|
RegenerateTaskListForInsert(Job *workerJob)
|
||||||
{
|
{
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
|
||||||
Query *jobQuery = workerJob->jobQuery;
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
|
bool parametersInJobQueryResolved = workerJob->parametersInJobQueryResolved;
|
||||||
DeferredErrorMessage *planningError = NULL;
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
/* need to perform shard pruning, rebuild the task list from scratch */
|
/* need to perform shard pruning, rebuild the task list from scratch */
|
||||||
List *taskList = RouterInsertTaskList(jobQuery, &planningError);
|
List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
|
||||||
|
&planningError);
|
||||||
|
|
||||||
if (planningError != NULL)
|
if (planningError != NULL)
|
||||||
{
|
{
|
||||||
|
@ -575,21 +636,17 @@ HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan)
|
||||||
workerJob->taskList = taskList;
|
workerJob->taskList = taskList;
|
||||||
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
|
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
|
||||||
|
|
||||||
RebuildQueryStrings(jobQuery, workerJob->taskList);
|
RebuildQueryStrings(workerJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HandleDeferredShardPruningForFastPathQueries does the shard pruning for
|
* RegenerateTaskForFasthPathQuery does the shard pruning for
|
||||||
* UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings.
|
* UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
HandleDeferredShardPruningForFastPathQueries(DistributedPlan *distributedPlan)
|
RegenerateTaskForFasthPathQuery(Job *workerJob)
|
||||||
{
|
{
|
||||||
Assert(distributedPlan->fastPathRouterPlan);
|
|
||||||
|
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
|
||||||
|
|
||||||
bool isMultiShardQuery = false;
|
bool isMultiShardQuery = false;
|
||||||
List *shardIntervalList =
|
List *shardIntervalList =
|
||||||
TargetShardIntervalForFastPathQuery(workerJob->jobQuery,
|
TargetShardIntervalForFastPathQuery(workerJob->jobQuery,
|
||||||
|
@ -756,10 +813,11 @@ static void
|
||||||
CitusReScan(CustomScanState *node)
|
CitusReScan(CustomScanState *node)
|
||||||
{
|
{
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
|
Job *workerJob = scanState->distributedPlan->workerJob;
|
||||||
EState *executorState = ScanStateGetExecutorState(scanState);
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
|
|
||||||
if (paramListInfo != NULL)
|
if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("Cursors for queries on distributed tables with "
|
errmsg("Cursors for queries on distributed tables with "
|
||||||
|
|
|
@ -184,15 +184,25 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Query *shardQuery = ParseQueryString(TaskQueryString(task), parameterTypes,
|
int taskNumParams = numParams;
|
||||||
numParams);
|
Oid *taskParameterTypes = parameterTypes;
|
||||||
|
|
||||||
|
if (task->parametersInQueryStringResolved)
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of
|
* Parameters were removed from the query string so do not pass them
|
||||||
* intermediate results in the query. That'd trigger ExecuteLocalTaskPlan()
|
* here. Otherwise, we might see errors when passing custom types,
|
||||||
* go through the distributed executor, which we do not want since the
|
* since their OIDs were set to 0 and their type is normally
|
||||||
* query is already known to be local.
|
* inferred from
|
||||||
*/
|
*/
|
||||||
|
taskNumParams = 0;
|
||||||
|
taskParameterTypes = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
||||||
|
taskParameterTypes,
|
||||||
|
taskNumParams);
|
||||||
|
|
||||||
int cursorOptions = 0;
|
int cursorOptions = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/insert_select_planner.h"
|
#include "distributed/insert_select_planner.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/local_executor.h"
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
@ -46,18 +47,20 @@ static char * DeparseTaskQuery(Task *task, Query *query);
|
||||||
* include execution-time changes such as function evaluation.
|
* include execution-time changes such as function evaluation.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RebuildQueryStrings(Query *originalQuery, List *taskList)
|
RebuildQueryStrings(Job *workerJob)
|
||||||
{
|
{
|
||||||
ListCell *taskCell = NULL;
|
Query *originalQuery = workerJob->jobQuery;
|
||||||
|
List *taskList = workerJob->taskList;
|
||||||
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
|
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
|
||||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
||||||
|
|
||||||
foreach(taskCell, taskList)
|
Task *task = NULL;
|
||||||
|
|
||||||
|
foreach_ptr(task, taskList)
|
||||||
{
|
{
|
||||||
Task *task = (Task *) lfirst(taskCell);
|
|
||||||
Query *query = originalQuery;
|
Query *query = originalQuery;
|
||||||
|
|
||||||
if (UpdateOrDeleteQuery(query) && list_length(taskList))
|
if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1)
|
||||||
{
|
{
|
||||||
query = copyObject(originalQuery);
|
query = copyObject(originalQuery);
|
||||||
}
|
}
|
||||||
|
@ -115,6 +118,12 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
|
||||||
|
|
||||||
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If parameters were resolved in the job query, then they are now also
|
||||||
|
* resolved in the query string.
|
||||||
|
*/
|
||||||
|
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
||||||
|
|
||||||
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||||
ApplyLogRedaction(TaskQueryString(task)))));
|
ApplyLogRedaction(TaskQueryString(task)))));
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,11 +158,11 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
|
static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId);
|
uint64 shardId, bool parametersInQueryResolved);
|
||||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||||
static List * SingleShardModifyTaskList(Query *query, uint64 jobId,
|
static List * SingleShardModifyTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId);
|
uint64 shardId, bool parametersInQueryResolved);
|
||||||
static List * RemoveCoordinatorPlacement(List *placementList);
|
static List * RemoveCoordinatorPlacement(List *placementList);
|
||||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
TaskAssignmentPolicyType
|
TaskAssignmentPolicyType
|
||||||
|
@ -1447,7 +1447,9 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
taskList = RouterInsertTaskList(query, planningError);
|
bool parametersInQueryResolved = false;
|
||||||
|
|
||||||
|
taskList = RouterInsertTaskList(query, parametersInQueryResolved, planningError);
|
||||||
if (*planningError)
|
if (*planningError)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1457,19 +1459,20 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
|
||||||
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Job *job = CreateJob(originalQuery);
|
||||||
|
job->taskList = taskList;
|
||||||
|
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||||
|
job->deferredPruning = deferredPruning;
|
||||||
|
|
||||||
if (!requiresMasterEvaluation)
|
if (!requiresMasterEvaluation)
|
||||||
{
|
{
|
||||||
/* no functions or parameters, build the query strings upfront */
|
/* no functions or parameters, build the query strings upfront */
|
||||||
RebuildQueryStrings(originalQuery, taskList);
|
RebuildQueryStrings(job);
|
||||||
|
|
||||||
/* remember the partition column value */
|
/* remember the partition column value */
|
||||||
partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
|
partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
Job *job = CreateJob(originalQuery);
|
|
||||||
job->taskList = taskList;
|
|
||||||
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
|
||||||
job->deferredPruning = deferredPruning;
|
|
||||||
job->partitionKeyValue = partitionKeyValue;
|
job->partitionKeyValue = partitionKeyValue;
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
|
@ -1561,7 +1564,8 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry)
|
||||||
* a distributed table via the router executor.
|
* a distributed table via the router executor.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
|
RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
|
||||||
|
DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
List *insertTaskList = NIL;
|
List *insertTaskList = NIL;
|
||||||
ListCell *modifyRouteCell = NULL;
|
ListCell *modifyRouteCell = NULL;
|
||||||
|
@ -1593,8 +1597,8 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
|
||||||
relationShard->relationId = distributedTableId;
|
relationShard->relationId = distributedTableId;
|
||||||
|
|
||||||
modifyTask->relationShardList = list_make1(relationShard);
|
modifyTask->relationShardList = list_make1(relationShard);
|
||||||
|
|
||||||
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
|
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
|
||||||
|
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
|
||||||
|
|
||||||
insertTaskList = lappend(insertTaskList, modifyTask);
|
insertTaskList = lappend(insertTaskList, modifyTask);
|
||||||
}
|
}
|
||||||
|
@ -1772,7 +1776,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
{
|
{
|
||||||
job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId,
|
job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId);
|
shardId,
|
||||||
|
job->parametersInJobQueryResolved);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||||
|
@ -1798,7 +1803,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
{
|
{
|
||||||
job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId,
|
job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId);
|
shardId,
|
||||||
|
job->parametersInJobQueryResolved);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1890,7 +1896,8 @@ RemoveCoordinatorPlacement(List *placementList)
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
|
SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList, uint64 shardId)
|
List *placementList, uint64 shardId,
|
||||||
|
bool parametersInQueryResolved)
|
||||||
{
|
{
|
||||||
Task *task = CreateTask(SELECT_TASK);
|
Task *task = CreateTask(SELECT_TASK);
|
||||||
List *relationRowLockList = NIL;
|
List *relationRowLockList = NIL;
|
||||||
|
@ -1908,6 +1915,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
task->jobId = jobId;
|
task->jobId = jobId;
|
||||||
task->relationShardList = relationShardList;
|
task->relationShardList = relationShardList;
|
||||||
task->relationRowLockList = relationRowLockList;
|
task->relationRowLockList = relationRowLockList;
|
||||||
|
task->parametersInQueryStringResolved = parametersInQueryResolved;
|
||||||
|
|
||||||
return list_make1(task);
|
return list_make1(task);
|
||||||
}
|
}
|
||||||
|
@ -1960,7 +1968,8 @@ RowLocksOnRelations(Node *node, List **relationRowLockList)
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
|
SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList, uint64 shardId)
|
List *placementList, uint64 shardId,
|
||||||
|
bool parametersInQueryResolved)
|
||||||
{
|
{
|
||||||
Task *task = CreateTask(MODIFY_TASK);
|
Task *task = CreateTask(MODIFY_TASK);
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = NIL;
|
||||||
|
@ -1987,6 +1996,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
task->jobId = jobId;
|
task->jobId = jobId;
|
||||||
task->relationShardList = relationShardList;
|
task->relationShardList = relationShardList;
|
||||||
task->replicationModel = modificationTableCacheEntry->replicationModel;
|
task->replicationModel = modificationTableCacheEntry->replicationModel;
|
||||||
|
task->parametersInQueryStringResolved = parametersInQueryResolved;
|
||||||
|
|
||||||
return list_make1(task);
|
return list_make1(task);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,11 @@ static bool ShouldEvaluateFunctionWithMasterContext(MasterEvaluationContext *
|
||||||
bool
|
bool
|
||||||
RequiresMasterEvaluation(Query *query)
|
RequiresMasterEvaluation(Query *query)
|
||||||
{
|
{
|
||||||
|
if (query->commandType == CMD_SELECT)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return FindNodeCheck((Node *) query, CitusIsMutableFunction);
|
return FindNodeCheck((Node *) query, CitusIsMutableFunction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ copyJobInfo(Job *newnode, Job *from)
|
||||||
COPY_SCALAR_FIELD(deferredPruning);
|
COPY_SCALAR_FIELD(deferredPruning);
|
||||||
COPY_NODE_FIELD(partitionKeyValue);
|
COPY_NODE_FIELD(partitionKeyValue);
|
||||||
COPY_NODE_FIELD(localPlannedStatements);
|
COPY_NODE_FIELD(localPlannedStatements);
|
||||||
|
COPY_SCALAR_FIELD(parametersInJobQueryResolved);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -274,6 +275,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
||||||
COPY_NODE_FIELD(relationRowLockList);
|
COPY_NODE_FIELD(relationRowLockList);
|
||||||
COPY_NODE_FIELD(rowValuesLists);
|
COPY_NODE_FIELD(rowValuesLists);
|
||||||
COPY_SCALAR_FIELD(partiallyLocalOrRemote);
|
COPY_SCALAR_FIELD(partiallyLocalOrRemote);
|
||||||
|
COPY_SCALAR_FIELD(parametersInQueryStringResolved);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -339,6 +339,7 @@ OutJobFields(StringInfo str, const Job *node)
|
||||||
WRITE_BOOL_FIELD(deferredPruning);
|
WRITE_BOOL_FIELD(deferredPruning);
|
||||||
WRITE_NODE_FIELD(partitionKeyValue);
|
WRITE_NODE_FIELD(partitionKeyValue);
|
||||||
WRITE_NODE_FIELD(localPlannedStatements);
|
WRITE_NODE_FIELD(localPlannedStatements);
|
||||||
|
WRITE_BOOL_FIELD(parametersInJobQueryResolved);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -492,6 +493,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_NODE_FIELD(relationRowLockList);
|
WRITE_NODE_FIELD(relationRowLockList);
|
||||||
WRITE_NODE_FIELD(rowValuesLists);
|
WRITE_NODE_FIELD(rowValuesLists);
|
||||||
WRITE_BOOL_FIELD(partiallyLocalOrRemote);
|
WRITE_BOOL_FIELD(partiallyLocalOrRemote);
|
||||||
|
WRITE_BOOL_FIELD(parametersInQueryStringResolved);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
|
|
||||||
|
|
||||||
extern void RebuildQueryStrings(Query *originalQuery, List *taskList);
|
extern void RebuildQueryStrings(Job *workerJob);
|
||||||
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||||
extern void SetTaskQuery(Task *task, Query *query);
|
extern void SetTaskQuery(Task *task, Query *query);
|
||||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||||
|
|
|
@ -157,6 +157,13 @@ typedef struct Job
|
||||||
|
|
||||||
/* for local shard queries, we may save the local plan here */
|
/* for local shard queries, we may save the local plan here */
|
||||||
List *localPlannedStatements;
|
List *localPlannedStatements;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When we evaluate functions and parameters in jobQuery then we
|
||||||
|
* should no longer send the list of parameters along with the
|
||||||
|
* query.
|
||||||
|
*/
|
||||||
|
bool parametersInJobQueryResolved;
|
||||||
} Job;
|
} Job;
|
||||||
|
|
||||||
|
|
||||||
|
@ -274,6 +281,13 @@ typedef struct Task
|
||||||
* the task splitted into local and remote tasks.
|
* the task splitted into local and remote tasks.
|
||||||
*/
|
*/
|
||||||
bool partiallyLocalOrRemote;
|
bool partiallyLocalOrRemote;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When we evaluate functions and parameters in the query string then
|
||||||
|
* we should no longer send the list of parameters long with the
|
||||||
|
* query.
|
||||||
|
*/
|
||||||
|
bool parametersInQueryStringResolved;
|
||||||
} Task;
|
} Task;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,8 @@ extern List * RelationShardListForShardIntervalList(List *shardIntervalList,
|
||||||
bool *shardsPresent);
|
bool *shardsPresent);
|
||||||
extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
|
extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
|
||||||
bool replacePrunedQueryWithDummy);
|
bool replacePrunedQueryWithDummy);
|
||||||
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
|
extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
|
||||||
|
DeferredErrorMessage **planningError);
|
||||||
extern Const * ExtractInsertPartitionKeyValue(Query *query);
|
extern Const * ExtractInsertPartitionKeyValue(Query *query);
|
||||||
extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *
|
extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *
|
||||||
restrictionContext,
|
restrictionContext,
|
||||||
|
|
|
@ -47,6 +47,20 @@ SELECT create_distributed_table('collections_list', 'key');
|
||||||
CREATE TABLE collections_list_0
|
CREATE TABLE collections_list_0
|
||||||
PARTITION OF collections_list (key, ser, ts, collection_id, value)
|
PARTITION OF collections_list (key, ser, ts, collection_id, value)
|
||||||
FOR VALUES IN ( 0 );
|
FOR VALUES IN ( 0 );
|
||||||
|
-- create a volatile function that returns the local node id
|
||||||
|
CREATE OR REPLACE FUNCTION get_local_node_id_volatile()
|
||||||
|
RETURNS INT AS $$
|
||||||
|
DECLARE localGroupId int;
|
||||||
|
BEGIN
|
||||||
|
SELECT groupid INTO localGroupId FROM pg_dist_local_group;
|
||||||
|
RETURN localGroupId;
|
||||||
|
END; $$ language plpgsql VOLATILE;
|
||||||
|
SELECT create_distributed_function('get_local_node_id_volatile()');
|
||||||
|
create_distributed_function
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- connection worker and get ready for the tests
|
-- connection worker and get ready for the tests
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET search_path TO local_shard_execution;
|
SET search_path TO local_shard_execution;
|
||||||
|
@ -634,6 +648,33 @@ PL/pgSQL function only_local_execution() line 5 at SQL statement
|
||||||
NOTICE: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
NOTICE: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
CONTEXT: SQL statement "DELETE FROM distributed_table WHERE key = 1"
|
CONTEXT: SQL statement "DELETE FROM distributed_table WHERE key = 1"
|
||||||
PL/pgSQL function only_local_execution() line 6 at SQL statement
|
PL/pgSQL function only_local_execution() line 6 at SQL statement
|
||||||
|
-- insert a row that we need in the next tests
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
|
||||||
|
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text
|
||||||
|
-- make sure that functions can use local execution
|
||||||
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation() AS $$
|
||||||
|
DECLARE nodeId INT;
|
||||||
|
BEGIN
|
||||||
|
-- fast path router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table WHERE key = 1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- regular router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = 1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
CALL only_local_execution_with_function_evaluation();
|
||||||
|
NOTICE: executing the command locally: SELECT local_shard_execution.get_local_node_id_volatile() AS get_local_node_id_volatile FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
CONTEXT: SQL statement "SELECT get_local_node_id_volatile() FROM distributed_table WHERE key = 1"
|
||||||
|
PL/pgSQL function only_local_execution_with_function_evaluation() line 5 at SQL statement
|
||||||
|
NOTICE: executing the command locally: SELECT local_shard_execution.get_local_node_id_volatile() AS get_local_node_id_volatile FROM (local_shard_execution.distributed_table_1470001 d1(key, value, age) JOIN local_shard_execution.distributed_table_1470001 d2(key, value, age) USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) 1)
|
||||||
|
CONTEXT: SQL statement "SELECT get_local_node_id_volatile() FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = 1"
|
||||||
|
PL/pgSQL function only_local_execution_with_function_evaluation() line 11 at SQL statement
|
||||||
CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
|
||||||
DECLARE cnt INT;
|
DECLARE cnt INT;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
@ -652,6 +693,29 @@ PL/pgSQL function only_local_execution_with_params(integer) line 5 at SQL statem
|
||||||
NOTICE: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
NOTICE: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
CONTEXT: SQL statement "DELETE FROM distributed_table WHERE key = $1"
|
CONTEXT: SQL statement "DELETE FROM distributed_table WHERE key = $1"
|
||||||
PL/pgSQL function only_local_execution_with_params(integer) line 6 at SQL statement
|
PL/pgSQL function only_local_execution_with_params(integer) line 6 at SQL statement
|
||||||
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation_param(int) AS $$
|
||||||
|
DECLARE nodeId INT;
|
||||||
|
BEGIN
|
||||||
|
-- fast path router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table WHERE key = $1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- regular router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = $1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
CALL only_local_execution_with_function_evaluation_param(1);
|
||||||
|
NOTICE: executing the command locally: SELECT local_shard_execution.get_local_node_id_volatile() AS get_local_node_id_volatile FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
CONTEXT: SQL statement "SELECT get_local_node_id_volatile() FROM distributed_table WHERE key = $1"
|
||||||
|
PL/pgSQL function only_local_execution_with_function_evaluation_param(integer) line 5 at SQL statement
|
||||||
|
NOTICE: executing the command locally: SELECT local_shard_execution.get_local_node_id_volatile() AS get_local_node_id_volatile FROM (local_shard_execution.distributed_table_1470001 d1(key, value, age) JOIN local_shard_execution.distributed_table_1470001 d2(key, value, age) USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) $1)
|
||||||
|
CONTEXT: SQL statement "SELECT get_local_node_id_volatile() FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = $1"
|
||||||
|
PL/pgSQL function only_local_execution_with_function_evaluation_param(integer) line 11 at SQL statement
|
||||||
CREATE OR REPLACE PROCEDURE local_execution_followed_by_dist() AS $$
|
CREATE OR REPLACE PROCEDURE local_execution_followed_by_dist() AS $$
|
||||||
DECLARE cnt INT;
|
DECLARE cnt INT;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
|
|
@ -53,50 +53,113 @@ SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key =
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- make sure that it is also true for fast-path router queries with paramaters
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
PREPARE p1(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1;
|
PREPARE fast_path_router_with_param(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1;
|
||||||
execute p1(1);
|
execute fast_path_router_with_param(1);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(2);
|
execute fast_path_router_with_param(2);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(3);
|
execute fast_path_router_with_param(3);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(4);
|
execute fast_path_router_with_param(4);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(5);
|
execute fast_path_router_with_param(5);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(6);
|
execute fast_path_router_with_param(6);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(7);
|
execute fast_path_router_with_param(7);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
execute p1(8);
|
execute fast_path_router_with_param(8);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- same query as fast_path_router_with_param, but with consts
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
PREPARE router_with_param(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table m1 JOIN master_evaluation_table m2 USING(key) WHERE key = $1;
|
||||||
|
execute router_with_param(1);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(2);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(3);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(4);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(5);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(6);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(7);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute router_with_param(8);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- same query as router_with_param, but with consts
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table m1 JOIN master_evaluation_table m2 USING(key) WHERE key = 1;
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -36,7 +36,8 @@ test: recursive_dml_queries_mx multi_mx_truncate_from_worker
|
||||||
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
|
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
|
||||||
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
||||||
test: multi_mx_metadata
|
test: multi_mx_metadata
|
||||||
test: multi_mx_call master_evaluation
|
test: master_evaluation master_evaluation_modify master_evaluation_select
|
||||||
|
test: multi_mx_call
|
||||||
test: multi_mx_function_call_delegation
|
test: multi_mx_function_call_delegation
|
||||||
test: multi_mx_modifications local_shard_execution
|
test: multi_mx_modifications local_shard_execution
|
||||||
test: multi_mx_transaction_recovery
|
test: multi_mx_transaction_recovery
|
||||||
|
|
|
@ -36,6 +36,16 @@ CREATE TABLE collections_list_0
|
||||||
PARTITION OF collections_list (key, ser, ts, collection_id, value)
|
PARTITION OF collections_list (key, ser, ts, collection_id, value)
|
||||||
FOR VALUES IN ( 0 );
|
FOR VALUES IN ( 0 );
|
||||||
|
|
||||||
|
-- create a volatile function that returns the local node id
|
||||||
|
CREATE OR REPLACE FUNCTION get_local_node_id_volatile()
|
||||||
|
RETURNS INT AS $$
|
||||||
|
DECLARE localGroupId int;
|
||||||
|
BEGIN
|
||||||
|
SELECT groupid INTO localGroupId FROM pg_dist_local_group;
|
||||||
|
RETURN localGroupId;
|
||||||
|
END; $$ language plpgsql VOLATILE;
|
||||||
|
SELECT create_distributed_function('get_local_node_id_volatile()');
|
||||||
|
|
||||||
-- connection worker and get ready for the tests
|
-- connection worker and get ready for the tests
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET search_path TO local_shard_execution;
|
SET search_path TO local_shard_execution;
|
||||||
|
@ -361,6 +371,29 @@ $$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
CALL only_local_execution();
|
CALL only_local_execution();
|
||||||
|
|
||||||
|
-- insert a row that we need in the next tests
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
|
||||||
|
|
||||||
|
-- make sure that functions can use local execution
|
||||||
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation() AS $$
|
||||||
|
DECLARE nodeId INT;
|
||||||
|
BEGIN
|
||||||
|
-- fast path router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table WHERE key = 1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- regular router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = 1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CALL only_local_execution_with_function_evaluation();
|
||||||
|
|
||||||
CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
|
||||||
DECLARE cnt INT;
|
DECLARE cnt INT;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
@ -372,6 +405,25 @@ $$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
CALL only_local_execution_with_params(1);
|
CALL only_local_execution_with_params(1);
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation_param(int) AS $$
|
||||||
|
DECLARE nodeId INT;
|
||||||
|
BEGIN
|
||||||
|
-- fast path router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table WHERE key = $1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- regular router
|
||||||
|
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = $1;
|
||||||
|
IF nodeId <= 0 THEN
|
||||||
|
RAISE NOTICE 'unexpected node id';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CALL only_local_execution_with_function_evaluation_param(1);
|
||||||
|
|
||||||
CREATE OR REPLACE PROCEDURE local_execution_followed_by_dist() AS $$
|
CREATE OR REPLACE PROCEDURE local_execution_followed_by_dist() AS $$
|
||||||
DECLARE cnt INT;
|
DECLARE cnt INT;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
|
|
@ -36,16 +36,33 @@ INSERT INTO master_evaluation_table SELECT i, i FROM generate_series(0,100)i;
|
||||||
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
|
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
|
||||||
|
|
||||||
-- make sure that it is also true for fast-path router queries with paramaters
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
PREPARE p1(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1;
|
PREPARE fast_path_router_with_param(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1;
|
||||||
|
|
||||||
execute p1(1);
|
execute fast_path_router_with_param(1);
|
||||||
execute p1(2);
|
execute fast_path_router_with_param(2);
|
||||||
execute p1(3);
|
execute fast_path_router_with_param(3);
|
||||||
execute p1(4);
|
execute fast_path_router_with_param(4);
|
||||||
execute p1(5);
|
execute fast_path_router_with_param(5);
|
||||||
execute p1(6);
|
execute fast_path_router_with_param(6);
|
||||||
execute p1(7);
|
execute fast_path_router_with_param(7);
|
||||||
execute p1(8);
|
execute fast_path_router_with_param(8);
|
||||||
|
|
||||||
|
-- same query as fast_path_router_with_param, but with consts
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
|
||||||
|
|
||||||
|
PREPARE router_with_param(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table m1 JOIN master_evaluation_table m2 USING(key) WHERE key = $1;
|
||||||
|
|
||||||
|
execute router_with_param(1);
|
||||||
|
execute router_with_param(2);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(4);
|
||||||
|
execute router_with_param(5);
|
||||||
|
execute router_with_param(6);
|
||||||
|
execute router_with_param(7);
|
||||||
|
execute router_with_param(8);
|
||||||
|
|
||||||
|
-- same query as router_with_param, but with consts
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table m1 JOIN master_evaluation_table m2 USING(key) WHERE key = 1;
|
||||||
|
|
||||||
-- for multi-shard queries, we still expect the evaluation to happen on the workers
|
-- for multi-shard queries, we still expect the evaluation to happen on the workers
|
||||||
SELECT count(*), max(get_local_node_id_volatile()) != 0, min(get_local_node_id_volatile()) != 0 FROM master_evaluation_table;
|
SELECT count(*), max(get_local_node_id_volatile()) != 0, min(get_local_node_id_volatile()) != 0 FROM master_evaluation_table;
|
||||||
|
|
|
@ -0,0 +1,502 @@
|
||||||
|
|
||||||
|
-- This test relies on metadata being synced
|
||||||
|
-- that's why is should be executed on MX schedule
|
||||||
|
CREATE SCHEMA master_evaluation_combinations_modify;
|
||||||
|
SET search_path TO master_evaluation_combinations_modify;
|
||||||
|
|
||||||
|
-- in this test, we are considering combinations of
|
||||||
|
-- several Citus features, and there is one prepared
|
||||||
|
-- statement for the combinations of following:
|
||||||
|
-- (a) Router Modify vs Fast Path Router Modify
|
||||||
|
-- (b) Local Execution vs Remote Execution
|
||||||
|
-- (c) Parameters on distribution key vs Parameters on non-dist key
|
||||||
|
-- vs Non-parametrized queries
|
||||||
|
-- (d) Master Function Evaluation Required vs
|
||||||
|
-- Master Function Evaluation Not Required
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 1180000;
|
||||||
|
|
||||||
|
-- create a volatile function that returns the local node id
|
||||||
|
CREATE OR REPLACE FUNCTION get_local_node_id_stable()
|
||||||
|
RETURNS INT AS $$
|
||||||
|
DECLARE localGroupId int;
|
||||||
|
BEGIN
|
||||||
|
SELECT groupid INTO localGroupId FROM pg_dist_local_group;
|
||||||
|
RETURN localGroupId;
|
||||||
|
END; $$ language plpgsql STABLE;
|
||||||
|
SELECT create_distributed_function('get_local_node_id_stable()');
|
||||||
|
|
||||||
|
-- returns 1 on coordinator
|
||||||
|
CREATE OR REPLACE FUNCTION get_constant_stable()
|
||||||
|
RETURNS INT AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN 1;
|
||||||
|
END; $$ language plpgsql STABLE;
|
||||||
|
|
||||||
|
CREATE TYPE user_data AS (name text, age int);
|
||||||
|
|
||||||
|
SET citus.replication_model TO streaming;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE user_info_data (user_id int, u_data user_data);
|
||||||
|
SELECT create_distributed_table('user_info_data', 'user_id');
|
||||||
|
|
||||||
|
-- show that local id is 0, we'll use this information
|
||||||
|
SELECT get_local_node_id_stable();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT i, ('name' || i, i % 20 + 20)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param(0);
|
||||||
|
execute fast_path_router_with_param(1);
|
||||||
|
execute fast_path_router_with_param(2);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(4);
|
||||||
|
execute fast_path_router_with_param(5);
|
||||||
|
execute fast_path_router_with_param(6);
|
||||||
|
execute fast_path_router_with_param(7);
|
||||||
|
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('test', get_local_node_id_stable())::user_data AND user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT i, ('test', 0)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
-- should evaluate the function on the coordinator, hence get_local_node_id_stable() returns zero
|
||||||
|
execute fast_path_router_with_param_and_func(0);
|
||||||
|
execute fast_path_router_with_param_and_func(1);
|
||||||
|
execute fast_path_router_with_param_and_func(2);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(4);
|
||||||
|
execute fast_path_router_with_param_and_func(5);
|
||||||
|
execute fast_path_router_with_param_and_func(6);
|
||||||
|
execute fast_path_router_with_param_and_func(7);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT 1, ('test' || i, 0)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key_and_func(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 1 RETURNING *;
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test0', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test1', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test2', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test3', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test4', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test5', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test6', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('test7', get_local_node_id_stable())::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT 1, ('test', i)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 1 RETURNING
|
||||||
|
*;
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 0)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 3)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 4)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 5)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 6)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 7)::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT i, ('test', i)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 RETURNING
|
||||||
|
*;
|
||||||
|
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 0)::user_data, 0);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 1)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 2);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 3)::user_data, 3);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 4)::user_data, 4);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 5)::user_data, 5);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 6)::user_data, 6);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 7)::user_data, 7);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_only_function AS DELETE FROM user_info_data WHERE get_local_node_id_stable() = 0 AND user_id = 1 RETURNING *;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
|
||||||
|
ALTER TABLE user_info_data ADD COLUMN user_index INT;
|
||||||
|
|
||||||
|
PREPARE insert_with_function_and_param(user_data) AS INSERT INTO user_info_data VALUES (1, $1, get_local_node_id_stable()) RETURNING user_id;
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
|
||||||
|
ALTER TABLE user_info_data DROP COLUMN user_index;
|
||||||
|
TRUNCATE user_info_data;
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT i, ('test', i)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
-- make sure that it is also true for non fast-path router queries with paramaters
|
||||||
|
PREPARE router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 AND user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
execute router_with_param(0);
|
||||||
|
execute router_with_param(1);
|
||||||
|
execute router_with_param(2);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(4);
|
||||||
|
execute router_with_param(5);
|
||||||
|
execute router_with_param(6);
|
||||||
|
execute router_with_param(7);
|
||||||
|
|
||||||
|
|
||||||
|
-- make sure that it is also true for non fast-path router queries with paramaters
|
||||||
|
PREPARE router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('test', get_local_node_id_stable())::user_data AND user_id = $1 AND user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT i, ('test', 0)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
execute router_with_param_and_func(0);
|
||||||
|
execute router_with_param_and_func(1);
|
||||||
|
execute router_with_param_and_func(2);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(4);
|
||||||
|
execute router_with_param_and_func(5);
|
||||||
|
execute router_with_param_and_func(6);
|
||||||
|
execute router_with_param_and_func(7);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT 1, ('test' || i, 0)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key_and_func(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 1 AND user_id = 1 RETURNING *;
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test0', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test1', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test2', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test3', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test4', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test5', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test6', get_local_node_id_stable())::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('test7', get_local_node_id_stable())::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT 1, ('test', i)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 1 AND user_id = 1 RETURNING *;
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 0)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 3)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 4)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 5)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 6)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 7)::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data SELECT i, ('test', i)::user_data FROM generate_series(0,7)i;
|
||||||
|
|
||||||
|
PREPARE router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 AND user_id = $2 RETURNING
|
||||||
|
*;
|
||||||
|
|
||||||
|
EXECUTE router_with_two_params(('test', 0)::user_data, 0);
|
||||||
|
EXECUTE router_with_two_params(('test', 1)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 2);
|
||||||
|
EXECUTE router_with_two_params(('test', 3)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('test', 4)::user_data, 4);
|
||||||
|
EXECUTE router_with_two_params(('test', 5)::user_data, 5);
|
||||||
|
EXECUTE router_with_two_params(('test', 6)::user_data, 6);
|
||||||
|
EXECUTE router_with_two_params(('test', 7)::user_data, 7);
|
||||||
|
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
|
||||||
|
PREPARE router_with_only_function AS DELETE FROM user_info_data WHERE get_local_node_id_stable() = 0 AND user_id = 1 AND user_id = 1 RETURNING *;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data VALUES(1, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SET search_path TO master_evaluation_combinations_modify;
|
||||||
|
|
||||||
|
-- returns 2 on the worker
|
||||||
|
CREATE OR REPLACE FUNCTION get_constant_stable()
|
||||||
|
RETURNS INT AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN 2;
|
||||||
|
END; $$ language plpgsql STABLE;
|
||||||
|
|
||||||
|
|
||||||
|
-- all local values
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES
|
||||||
|
(3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'),
|
||||||
|
(9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'),
|
||||||
|
(14, '(''test14'', 14)'), (16, '(''test16'', 16)');
|
||||||
|
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(4);
|
||||||
|
execute fast_path_router_with_param(7);
|
||||||
|
execute fast_path_router_with_param(9);
|
||||||
|
execute fast_path_router_with_param(11);
|
||||||
|
execute fast_path_router_with_param(12);
|
||||||
|
execute fast_path_router_with_param(14);
|
||||||
|
execute fast_path_router_with_param(16);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES
|
||||||
|
(3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'),
|
||||||
|
(9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'),
|
||||||
|
(14, '(''test'', 2)'), (16, '(''test'', 2)');
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(4);
|
||||||
|
execute fast_path_router_with_param_and_func(7);
|
||||||
|
execute fast_path_router_with_param_and_func(9);
|
||||||
|
execute fast_path_router_with_param_and_func(11);
|
||||||
|
execute fast_path_router_with_param_and_func(12);
|
||||||
|
execute fast_path_router_with_param_and_func(14);
|
||||||
|
execute fast_path_router_with_param_and_func(16);
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key_and_func(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 3 RETURNING *;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)'::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 3 RETURNING *;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES
|
||||||
|
(3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)),
|
||||||
|
(9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)),
|
||||||
|
(14, ('test', 2)), (16, ('test', 2));
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 RETURNING *;
|
||||||
|
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 3);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 4);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 7);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 9);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 11);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 12);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 14);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 16);
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_only_function AS DELETE FROM user_info_data WHERE get_constant_stable() = 2AND user_id = 3 RETURNING *;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
|
||||||
|
|
||||||
|
-------
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO master_evaluation_combinations_modify;
|
||||||
|
ALTER TABLE user_info_data ADD COLUMN user_index INT;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO master_evaluation_combinations_modify;
|
||||||
|
|
||||||
|
PREPARE insert_with_function_and_param(user_data) AS INSERT INTO user_info_data VALUES (3, $1, get_local_node_id_stable()) RETURNING user_id;
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
EXECUTE insert_with_function_and_param(('test', 1)::user_data);
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO master_evaluation_combinations_modify;
|
||||||
|
ALTER TABLE user_info_data DROP COLUMN user_index;
|
||||||
|
TRUNCATE user_info_data;
|
||||||
|
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SET search_path TO master_evaluation_combinations_modify;
|
||||||
|
|
||||||
|
-- all local values
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES
|
||||||
|
(3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'),
|
||||||
|
(9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'),
|
||||||
|
(14, '(''test14'', 14)'), (16, '(''test16'', 16)');
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 AND user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(4);
|
||||||
|
execute router_with_param(7);
|
||||||
|
execute router_with_param(9);
|
||||||
|
execute router_with_param(11);
|
||||||
|
execute router_with_param(12);
|
||||||
|
execute router_with_param(14);
|
||||||
|
execute router_with_param(16);
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES
|
||||||
|
(3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'),
|
||||||
|
(9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'),
|
||||||
|
(14, '(''test'', 2)'), (16, '(''test'', 2)');
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 AND user_id = $1 RETURNING *;
|
||||||
|
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(4);
|
||||||
|
execute router_with_param_and_func(7);
|
||||||
|
execute router_with_param_and_func(9);
|
||||||
|
execute router_with_param_and_func(11);
|
||||||
|
execute router_with_param_and_func(12);
|
||||||
|
execute router_with_param_and_func(14);
|
||||||
|
execute router_with_param_and_func(16);
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key_and_func(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 3 AND user_id = 3 RETURNING *;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)'::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)');
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('''test''', get_constant_stable())::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key(user_data) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = 3 AND user_id = 3 RETURNING *;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 1)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('test', 1)::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES
|
||||||
|
(3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)),
|
||||||
|
(9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)),
|
||||||
|
(14, ('test', 2)), (16, ('test', 2));
|
||||||
|
|
||||||
|
PREPARE router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 AND user_id = $2 RETURNING *;
|
||||||
|
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 4);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 7);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 9);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 11);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 12);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 14);
|
||||||
|
EXECUTE router_with_two_params(('test', 2)::user_data, 16);
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE router_with_only_function AS DELETE FROM user_info_data WHERE get_constant_stable() = 2AND user_id = 3 RETURNING *;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)::user_data);
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
|
||||||
|
-- suppress notices
|
||||||
|
\c - - - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA master_evaluation_combinations_modify CASCADE;
|
|
@ -0,0 +1,373 @@
|
||||||
|
|
||||||
|
-- This test relies on metadata being synced
|
||||||
|
-- that's why is should be executed on MX schedule
|
||||||
|
|
||||||
|
-- in this test, we are considering combinations of
|
||||||
|
-- several Citus features, and there is one prepared
|
||||||
|
-- statement for the combinations of following:
|
||||||
|
-- (a) Router Select vs Fast Path Router Select
|
||||||
|
-- (b) Local Execution vs Remote Execution
|
||||||
|
-- (c) Parameters on distribution key vs Parameters on non-dist key
|
||||||
|
-- vs Non-parametrized queries
|
||||||
|
-- (d) Master Function Evaluation Required vs
|
||||||
|
-- Master Function Evaluation Not Required
|
||||||
|
|
||||||
|
CREATE SCHEMA master_evaluation_combinations;
|
||||||
|
SET search_path TO master_evaluation_combinations;
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 1170000;
|
||||||
|
|
||||||
|
-- create a volatile function that returns the local node id
|
||||||
|
CREATE OR REPLACE FUNCTION get_local_node_id_volatile()
|
||||||
|
RETURNS INT AS $$
|
||||||
|
DECLARE localGroupId int;
|
||||||
|
BEGIN
|
||||||
|
SELECT groupid INTO localGroupId FROM pg_dist_local_group;
|
||||||
|
RETURN localGroupId;
|
||||||
|
END; $$ language plpgsql VOLATILE;
|
||||||
|
SELECT create_distributed_function('get_local_node_id_volatile()');
|
||||||
|
|
||||||
|
CREATE TYPE user_data AS (name text, age int);
|
||||||
|
|
||||||
|
SET citus.replication_model TO streaming;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE user_info_data (user_id int, u_data user_data);
|
||||||
|
SELECT create_distributed_table('user_info_data', 'user_id');
|
||||||
|
|
||||||
|
-- show that local id is 0, we'll use this information
|
||||||
|
SELECT get_local_node_id_volatile();
|
||||||
|
|
||||||
|
-- load data
|
||||||
|
INSERT INTO user_info_data SELECT i, ('name' || i, i % 20 + 20)::user_data FROM generate_series(0,100)i;
|
||||||
|
|
||||||
|
-- we expect that the function is evaluated on the worker node, so we should get a row
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 1;
|
||||||
|
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param(int) AS SELECT count(*) FROM user_info_data WHERE user_id = $1;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param(1);
|
||||||
|
execute fast_path_router_with_param(2);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(4);
|
||||||
|
execute fast_path_router_with_param(5);
|
||||||
|
execute fast_path_router_with_param(6);
|
||||||
|
execute fast_path_router_with_param(7);
|
||||||
|
execute fast_path_router_with_param(8);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 1;
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param_and_func(int) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = $1;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param_and_func(1);
|
||||||
|
execute fast_path_router_with_param_and_func(2);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(4);
|
||||||
|
execute fast_path_router_with_param_and_func(5);
|
||||||
|
execute fast_path_router_with_param_and_func(6);
|
||||||
|
execute fast_path_router_with_param_and_func(7);
|
||||||
|
execute fast_path_router_with_param_and_func(8);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 1 AND u_data = ('name1', 21)::user_data;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key_and_func(user_data) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 1 AND u_data = $1;
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data WHERE user_id = 1 AND u_data = ('name1', 21)::user_data;
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key(user_data) AS SELECT count(*) FROM user_info_data WHERE user_id = 1 AND u_data = $1;
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_two_params(user_data, int) AS SELECT count(*) FROM user_info_data WHERE user_id = $2 AND u_data = $1;
|
||||||
|
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE fast_path_router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 1;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_only_function AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 1;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = 1;
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE router_with_param(int) AS SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = $1;
|
||||||
|
|
||||||
|
execute router_with_param(1);
|
||||||
|
execute router_with_param(2);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(4);
|
||||||
|
execute router_with_param(5);
|
||||||
|
execute router_with_param(6);
|
||||||
|
execute router_with_param(7);
|
||||||
|
execute router_with_param(8);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data m1 JOIN user_info_data m2 USING(user_id) WHERE m1.user_id = 1;
|
||||||
|
|
||||||
|
PREPARE router_with_param_and_func(int) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data m1 JOIN user_info_data m2 USING(user_id) WHERE m1.user_id = $1;
|
||||||
|
|
||||||
|
execute router_with_param_and_func(1);
|
||||||
|
execute router_with_param_and_func(2);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(4);
|
||||||
|
execute router_with_param_and_func(5);
|
||||||
|
execute router_with_param_and_func(6);
|
||||||
|
execute router_with_param_and_func(7);
|
||||||
|
execute router_with_param_and_func(8);
|
||||||
|
|
||||||
|
-- same query as router_with_param, but with consts
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data m1 JOIN user_info_data m2 USING(user_id) WHERE m1.user_id = 1;
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key(user_data) AS SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 1 AND u1.u_data = $1;
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name1', 21)::user_data);
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 1 AND u1.u_data = ('name1', 21)::user_data;
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key_and_func(user_data) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 1 AND u1.u_data = $1;
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name1', 21)::user_data);
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = 1 AND u1.u_data = ('name1', 21)::user_data;
|
||||||
|
|
||||||
|
PREPARE router_with_two_params(user_data, int) AS SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = $2 AND u1.u_data = $1;
|
||||||
|
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
EXECUTE router_with_two_params(('name1', 21)::user_data, 1);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING(user_id) WHERE user_id = 1;
|
||||||
|
|
||||||
|
PREPARE router_with_only_function AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING(user_id) WHERE user_id = 1;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SET search_path TO master_evaluation_combinations;
|
||||||
|
|
||||||
|
-- show that the data with user_id = 3 is local
|
||||||
|
SELECT count(*) FROM user_info_data WHERE user_id = 3;
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param(int) AS SELECT count(*) FROM user_info_data WHERE user_id = $1;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
execute fast_path_router_with_param(3);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 3;
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE fast_path_router_with_param_and_func(int) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = $1;
|
||||||
|
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(3);
|
||||||
|
execute fast_path_router_with_param_and_func(8);
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 3 AND u_data = ('name3', 23)::user_data;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key_and_func(user_data) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 3 AND u_data = $1;
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data WHERE user_id = 3 AND u_data = ('name3', 23)::user_data;
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_param_on_non_dist_key(user_data) AS SELECT count(*) FROM user_info_data WHERE user_id = 3 AND u_data = $1;
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE fast_path_router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
|
||||||
|
PREPARE fast_path_router_with_only_function AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data WHERE user_id = 3;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
EXECUTE fast_path_router_with_only_function;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = 3;
|
||||||
|
|
||||||
|
-- make sure that it is also true for fast-path router queries with paramaters
|
||||||
|
PREPARE router_with_param(int) AS SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = $1;
|
||||||
|
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
execute router_with_param(3);
|
||||||
|
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data m1 JOIN user_info_data m2 USING(user_id) WHERE m1.user_id = 3;
|
||||||
|
|
||||||
|
PREPARE router_with_param_and_func(int) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data m1 JOIN user_info_data m2 USING(user_id) WHERE m1.user_id = $1;
|
||||||
|
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
execute router_with_param_and_func(3);
|
||||||
|
|
||||||
|
-- same query as router_with_param, but with consts
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data m1 JOIN user_info_data m2 USING(user_id) WHERE m1.user_id = 3;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 3 AND u1.u_data = ('name3', 23)::user_data;
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 3 AND u1.u_data = ('name3', 23)::user_data;
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key(user_data) AS SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 3 AND u1.u_data = $1;
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key(('name3', 23)::user_data);
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 3 AND u1.u_data = ('name3', 23)::user_data;
|
||||||
|
|
||||||
|
PREPARE router_with_param_on_non_dist_key_and_func(user_data) AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE u1.user_id = 3 AND u1.u_data = $1;
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
EXECUTE router_with_param_on_non_dist_key_and_func(('name3', 23)::user_data);
|
||||||
|
|
||||||
|
SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = 3 AND u1.u_data = ('name3', 23)::user_data;
|
||||||
|
|
||||||
|
PREPARE router_with_two_params(user_data, int) AS SELECT count(*) FROM user_info_data u1 JOIN user_info_data u2 USING (user_id) WHERE user_id = $2 AND u1.u_data = $1;
|
||||||
|
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
EXECUTE router_with_two_params(('name3', 23)::user_data, 3);
|
||||||
|
|
||||||
|
SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING(user_id) WHERE user_id = 3;
|
||||||
|
|
||||||
|
PREPARE router_with_only_function AS SELECT get_local_node_id_volatile() > 0 FROM user_info_data u1 JOIN user_info_data u2 USING(user_id) WHERE user_id = 3;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
EXECUTE router_with_only_function;
|
||||||
|
|
||||||
|
|
||||||
|
-- suppress notices
|
||||||
|
\c - - - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA master_evaluation_combinations CASCADE;
|
Loading…
Reference in New Issue