Merge pull request #2938 from citusdata/local_execution_2

Introduce the concept of Local Execution
pull/2934/head
Önder Kalacı 2019-09-12 12:18:43 +02:00 committed by GitHub
commit 07cca85227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 3712 additions and 35 deletions

View File

@ -71,6 +71,7 @@
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/intermediate_results.h" #include "distributed/intermediate_results.h"
#include "distributed/local_executor.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
@ -2233,6 +2234,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
const char *delimiterCharacter = "\t"; const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N"; const char *nullPrintCharacter = "\\N";
/* Citus currently doesn't know how to handle COPY command locally */
ErrorIfLocalExecutionHappened();
/* look up table properties */ /* look up table properties */
distributedRelation = heap_open(tableId, RowExclusiveLock); distributedRelation = heap_open(tableId, RowExclusiveLock);
cacheEntry = DistributedTableCacheEntry(tableId); cacheEntry = DistributedTableCacheEntry(tableId);

View File

@ -31,14 +31,17 @@
#include "access/attnum.h" #include "access/attnum.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */ #include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
#include "distributed/local_executor.h"
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -140,6 +143,49 @@ multi_ProcessUtility(PlannedStmt *pstmt,
return; return;
} }
if (IsA(parsetree, ExplainStmt) &&
IsA(((ExplainStmt *) parsetree)->query, Query))
{
ExplainStmt *explainStmt = (ExplainStmt *) parsetree;
if (IsTransactionBlock())
{
ListCell *optionCell = NULL;
bool analyze = false;
foreach(optionCell, explainStmt->options)
{
DefElem *option = (DefElem *) lfirst(optionCell);
if (strcmp(option->defname, "analyze") == 0)
{
analyze = defGetBoolean(option);
}
/* don't "break", as explain.c will use the last value */
}
if (analyze)
{
/*
* Since we cannot execute EXPLAIN ANALYZE locally, we
* cannot continue.
*/
ErrorIfLocalExecutionHappened();
}
}
/*
* EXPLAIN ANALYZE is tricky with local execution, and there is not
* much difference between the local and distributed execution in terms
* of the actual EXPLAIN output.
*
* TODO: It might be nice to have a way to show that the query is locally
* executed. Shall we add a INFO output?
*/
DisableLocalExecution();
}
if (IsA(parsetree, CreateSubscriptionStmt)) if (IsA(parsetree, CreateSubscriptionStmt))
{ {
CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree; CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree;

View File

@ -1259,3 +1259,19 @@ ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
return 0; return 0;
} }
} }
/*
* AnyConnectionAccessedPlacements simply checks the number of entries in
* ConnectionPlacementHash. This is useful to detect whether we're in a
* distirbuted transaction and already executed at least one command that
* accessed to a placement.
*/
bool
AnyConnectionAccessedPlacements(void)
{
/* this is initialized on PG_INIT */
Assert(ConnectionPlacementHash != NULL);
return hash_get_num_entries(ConnectionPlacementHash) > 0;
}

View File

@ -131,6 +131,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
@ -161,7 +162,14 @@ typedef struct DistributedExecution
/* the corresponding distributed plan's modLevel */ /* the corresponding distributed plan's modLevel */
RowModifyLevel modLevel; RowModifyLevel modLevel;
/*
* tasksToExecute contains all the tasks required to finish the execution, and
* it is the union of remoteTaskList and localTaskList. After (if any) local
* tasks are executed, remoteTaskList becomes equivalent of tasksToExecute.
*/
List *tasksToExecute; List *tasksToExecute;
List *remoteTaskList;
List *localTaskList;
/* the corresponding distributed plan has RETURNING */ /* the corresponding distributed plan has RETURNING */
bool hasReturning; bool hasReturning;
@ -519,6 +527,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
int targetPoolSize); int targetPoolSize);
static void StartDistributedExecution(DistributedExecution *execution); static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution);
static bool ShouldRunTasksSequentially(List *taskList); static bool ShouldRunTasksSequentially(List *taskList);
static void SequentialRunDistributedExecution(DistributedExecution *execution); static void SequentialRunDistributedExecution(DistributedExecution *execution);
@ -528,8 +537,9 @@ static void CleanUpSessions(DistributedExecution *execution);
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *
execution);
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
static bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
@ -588,12 +598,12 @@ AdaptiveExecutor(CustomScanState *node)
DistributedExecution *execution = NULL; DistributedExecution *execution = NULL;
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
Tuplestorestate *tupleStore = NULL;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true; bool randomAccess = true;
bool interTransactions = false; bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize; int targetPoolSize = MaxAdaptiveExecutorPoolSize;
Job *job = distributedPlan->workerJob; Job *job = distributedPlan->workerJob;
List *taskList = job->taskList; List *taskList = job->taskList;
@ -609,22 +619,35 @@ AdaptiveExecutor(CustomScanState *node)
ExecuteSubPlans(distributedPlan); ExecuteSubPlans(distributedPlan);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
tupleStore = scanState->tuplestorestate;
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
/* defer decision after ExecuteSubPlans() */
targetPoolSize = 1; targetPoolSize = 1;
} }
execution = CreateDistributedExecution(distributedPlan->modLevel, taskList, scanState->tuplestorestate =
distributedPlan->hasReturning, tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
paramListInfo, tupleDescriptor,
tupleStore, targetPoolSize);
execution = CreateDistributedExecution(distributedPlan->modLevel, taskList,
distributedPlan->hasReturning, paramListInfo,
tupleDescriptor,
scanState->tuplestorestate, targetPoolSize);
/*
* Make sure that we acquire the appropriate locks even if the local tasks
* are going to be executed with local execution.
*/
StartDistributedExecution(execution); StartDistributedExecution(execution);
/* execute tasks local to the node (if any) */
if (list_length(execution->localTaskList) > 0)
{
RunLocalExecution(scanState, execution);
/* make sure that we only execute remoteTaskList afterwards */
AdjustDistributedExecutionAfterLocalExecution(execution);
}
if (ShouldRunTasksSequentially(execution->tasksToExecute)) if (ShouldRunTasksSequentially(execution->tasksToExecute))
{ {
SequentialRunDistributedExecution(execution); SequentialRunDistributedExecution(execution);
@ -636,8 +659,23 @@ AdaptiveExecutor(CustomScanState *node)
if (distributedPlan->modLevel != ROW_MODIFY_READONLY) if (distributedPlan->modLevel != ROW_MODIFY_READONLY)
{ {
if (list_length(execution->localTaskList) == 0)
{
Assert(executorState->es_processed == 0);
executorState->es_processed = execution->rowsProcessed; executorState->es_processed = execution->rowsProcessed;
} }
else if (distributedPlan->targetRelationId != InvalidOid &&
PartitionMethod(distributedPlan->targetRelationId) != DISTRIBUTE_BY_NONE)
{
/*
* For reference tables we already add rowsProcessed on the local execution,
* this is required to ensure that mixed local/remote executions reports
* the accurate number of rowsProcessed to the user.
*/
executorState->es_processed += execution->rowsProcessed;
}
}
FinishDistributedExecution(execution); FinishDistributedExecution(execution);
@ -650,6 +688,52 @@ AdaptiveExecutor(CustomScanState *node)
} }
/*
* RunLocalExecution runs the localTaskList in the execution, fills the tuplestore
* and sets the es_processed if necessary.
*
* It also sorts the tuplestore if there are no remote tasks remaining.
*/
static void
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
{
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList);
EState *executorState = NULL;
LocalExecutionHappened = true;
/*
* We're deliberately not setting execution->rowsProceessed here. The main reason
* is that modifications to reference tables would end-up setting it both here
* and in AdaptiveExecutor. Instead, we set executorState here and skip updating it
* for reference table modifications in AdaptiveExecutor.
*/
executorState = ScanStateGetExecutorState(scanState);
executorState->es_processed = rowsProcessed;
}
/*
* AdjustDistributedExecutionAfterLocalExecution simply updates the necessary fields of
* the distributed execution.
*/
static void
AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
{
/*
* Local execution already stores the tuples for returning, so we should not
* store again.
*/
execution->hasReturning = false;
/* we only need to execute the remote tasks */
execution->tasksToExecute = execution->remoteTaskList;
execution->totalTaskCount = list_length(execution->remoteTaskList);
execution->unfinishedTaskCount = list_length(execution->remoteTaskList);
}
/* /*
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task * ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
* list for utility commands. If the adaptive executor is enabled, the function * list for utility commands. If the adaptive executor is enabled, the function
@ -705,6 +789,12 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
DistributedExecution *execution = NULL; DistributedExecution *execution = NULL;
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
/*
* The code-paths that rely on this function do not know how execute
* commands locally.
*/
ErrorIfLocalExecutionHappened();
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
targetPoolSize = 1; targetPoolSize = 1;
@ -726,7 +816,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
* CreateDistributedExecution creates a distributed execution data structure for * CreateDistributedExecution creates a distributed execution data structure for
* a distributed plan. * a distributed plan.
*/ */
DistributedExecution * static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor, ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize) Tuplestorestate *tupleStore, int targetPoolSize)
@ -738,6 +828,9 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
execution->tasksToExecute = taskList; execution->tasksToExecute = taskList;
execution->hasReturning = hasReturning; execution->hasReturning = hasReturning;
execution->localTaskList = NIL;
execution->remoteTaskList = NIL;
execution->executionStats = execution->executionStats =
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats)); (DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
execution->paramListInfo = paramListInfo; execution->paramListInfo = paramListInfo;
@ -757,6 +850,14 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
execution->connectionSetChanged = false; execution->connectionSetChanged = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
if (ShouldExecuteTasksLocally(taskList))
{
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList,
&execution->remoteTaskList);
}
return execution; return execution;
} }
@ -774,11 +875,20 @@ StartDistributedExecution(DistributedExecution *execution)
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_BARE) if (MultiShardCommitProtocol != COMMIT_PROTOCOL_BARE)
{ {
if (DistributedExecutionRequiresRollback(execution)) /*
* In case localExecutionHappened, we simply force the executor to use 2PC.
* The primary motivation is that at this point we're definitely expanding
* the nodes participated in the transaction. And, by re-generating the
* remote task lists during local query execution, we might prevent the adaptive
* executor to kick-in 2PC (or even start coordinated transaction, that's why
* we prefer adding this check here instead of
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
*/
if (DistributedExecutionRequiresRollback(execution) || LocalExecutionHappened)
{ {
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
if (TaskListRequires2PC(taskList)) if (TaskListRequires2PC(taskList) || LocalExecutionHappened)
{ {
/* /*
* Although using two phase commit protocol is an independent decision than * Although using two phase commit protocol is an independent decision than
@ -848,7 +958,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution)
* DistributedPlanModifiesDatabase returns true if the plan modifies the data * DistributedPlanModifiesDatabase returns true if the plan modifies the data
* or the schema. * or the schema.
*/ */
static bool bool
DistributedPlanModifiesDatabase(DistributedPlan *plan) DistributedPlanModifiesDatabase(DistributedPlan *plan)
{ {
return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList); return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList);
@ -1252,7 +1362,6 @@ AssignTasksToConnections(DistributedExecution *execution)
shardCommandExecution->expectResults = hasReturning || shardCommandExecution->expectResults = hasReturning ||
modLevel == ROW_MODIFY_READONLY; modLevel == ROW_MODIFY_READONLY;
foreach(taskPlacementCell, task->taskPlacementList) foreach(taskPlacementCell, task->taskPlacementList)
{ {
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);

View File

@ -14,6 +14,7 @@
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
#include "distributed/insert_select_executor.h" #include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
@ -74,6 +75,16 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
/*
* INSERT .. SELECT via coordinator consists of two steps, a SELECT is
* followd by a COPY. If the SELECT is executed locally, then the COPY
* would fail since Citus currently doesn't know how to handle COPY
* locally. So, to prevent the command fail, we simply disable local
* execution.
*/
DisableLocalExecution();
/* /*
* If we are dealing with partitioned table, we also need to lock its * If we are dealing with partitioned table, we also need to lock its
* partitions. Here we only lock targetRelation, we acquire necessary * partitions. Here we only lock targetRelation, we acquire necessary
@ -84,7 +95,6 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
LockPartitionRelations(targetRelationId, RowExclusiveLock); LockPartitionRelations(targetRelationId, RowExclusiveLock);
} }
if (distributedPlan->workerJob != NULL) if (distributedPlan->workerJob != NULL)
{ {
/* /*

View File

@ -0,0 +1,490 @@
/*
* local_executor.c
*
* The scope of the local execution is locally executing the queries on the
* shards. In other words, local execution does not deal with any local tables
* that are not shards on the node that the query is being executed. In that sense,
* the local executor is only triggered if the node has both the metadata and the
* shards (e.g., only Citus MX worker nodes).
*
* The goal of the local execution is to skip the unnecessary network round-trip
* happening on the node itself. Instead, identify the locally executable tasks and
* simply call PostgreSQL's planner and executor.
*
* The local executor is an extension of the adaptive executor. So, the executor uses
* adaptive executor's custom scan nodes.
*
* One thing to note that Citus MX is only supported with replication factor = 1, so
* keep that in mind while continuing the comments below.
*
* On the high level, there are 3 slightly different ways of utilizing local execution:
*
* (1) Execution of local single shard queries of a distributed table
*
* This is the simplest case. The executor kicks at the start of the adaptive
* executor, and since the query is only a single task the execution finishes
* without going to the network at all.
*
* Even if there is a transaction block (or recursively planned CTEs), as long
* as the queries hit the shards on the same, the local execution will kick in.
*
* (2) Execution of local single queries and remote multi-shard queries
*
* The rule is simple. If a transaction block starts with a local query execution,
* all the other queries in the same transaction block that touch any local shard
* have to use the local execution. Although this sounds restrictive, we prefer to
* implement in this way, otherwise we'd end-up with as complex scenarious as we
* have in the connection managements due to foreign keys.
*
* See the following example:
* BEGIN;
* -- assume that the query is executed locally
* SELECT count(*) FROM test WHERE key = 1;
*
* -- at this point, all the shards that reside on the
* -- node is executed locally one-by-one. After those finishes
* -- the remaining tasks are handled by adaptive executor
* SELECT count(*) FROM test;
*
*
* (3) Modifications of reference tables
*
* Modifications to reference tables have to be executed on all nodes. So, after the
* local execution, the adaptive executor keeps continuing the execution on the other
* nodes.
*
* Note that for read-only queries, after the local execution, there is no need to
* kick in adaptive executor.
*
* There are also few limitations/trade-offs that is worth mentioning. First, the
* local execution on multiple shards might be slow because the execution has to
* happen one task at a time (e.g., no parallelism). Second, if a transaction
* block/CTE starts with a multi-shard command, we do not use local query execution
* since local execution is sequential. Basically, we do not want to lose parallelism
* across local tasks by switching to local execution. Third, the local execution
* currently only supports queries. In other words, any utility commands like TRUNCATE,
* fails if the command is executed after a local execution inside a transaction block.
* Forth, the local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker, real-time and router executors. Finally, related with the
* previous item, COPY command cannot be mixed with local execution in a transaction.
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
* via the local execution.
*/
#include "postgres.h"
#include "miscadmin.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
#include "distributed/multi_router_executor.h"
#include "distributed/transaction_management.h"
#include "executor/tstoreReceiver.h"
#include "executor/tuptable.h"
#include "optimizer/planner.h"
#include "nodes/params.h"
#include "utils/snapmgr.h"
/* controlled via a GUC */
bool EnableLocalExecution = true;
bool LogLocalCommands = false;
bool LocalExecutionHappened = false;
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList,
List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan,
char *queryString);
static bool TaskAccessesLocalNode(Task *task);
static void LogLocalCommand(const char *command);
/*
* ExecuteLocalTasks gets a CitusScanState node and list of local tasks.
*
* The function goes over the task list and executes them locally.
* The returning tuples (if any) is stored in the CitusScanState.
*
* The function returns totalRowsProcessed.
*/
uint64
ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
{
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info);
int numParams = 0;
Oid *parameterTypes = NULL;
ListCell *taskCell = NULL;
uint64 totalRowsProcessed = 0;
if (paramListInfo != NULL)
{
const char **parameterValues = NULL; /* not used anywhere, so decleare here */
ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes,
&parameterValues);
numParams = paramListInfo->numParams;
}
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
PlannedStmt *localPlan = NULL;
int cursorOptions = 0;
const char *shardQueryString = task->queryString;
Query *shardQuery = ParseQueryString(shardQueryString, parameterTypes, numParams);
/*
* We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of
* intermediate results in the query. That'd trigger ExecuteLocalTaskPlan()
* go through the distributed executor, which we do not want since the
* query is already known to be local.
*/
cursorOptions = 0;
/*
* Altough the shardQuery is local to this node, we prefer planner()
* over standard_planner(). The primary reason for that is Citus itself
* is not very tolarent standard_planner() calls that doesn't go through
* distributed_planner() because of the way that restriction hooks are
* implemented. So, let planner to call distributed_planner() which
* eventually calls standard_planner().
*/
localPlan = planner(shardQuery, cursorOptions, paramListInfo);
LogLocalCommand(shardQueryString);
totalRowsProcessed +=
ExecuteLocalTaskPlan(scanState, localPlan, task->queryString);
}
return totalRowsProcessed;
}
/*
* ExtractLocalAndRemoteTasks gets a taskList and generates two
* task lists namely localTaskList and remoteTaskList. The function goes
* over the input taskList and puts the tasks that are local to the node
* into localTaskList and the remaining to the remoteTaskList. Either of
* the lists could be NIL depending on the input taskList.
*
* One slightly different case is modifications to replicated tables
* (e.g., reference tables) where a single task ends in two seperate tasks
* and the local task is added to localTaskList and the remanings to the
* remoteTaskList.
*/
void
ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
List **remoteTaskList)
{
ListCell *taskCell = NULL;
*remoteTaskList = NIL;
*localTaskList = NIL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
List *localTaskPlacementList = NULL;
List *remoteTaskPlacementList = NULL;
SplitLocalAndRemotePlacements(task->taskPlacementList, &localTaskPlacementList,
&remoteTaskPlacementList);
/* either the local or the remote should be non-nil */
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
if (list_length(task->taskPlacementList) == 1)
{
/*
* At this point, the task has a single placement (e.g,. anchor shard is
* distributed table's shard). So, it is either added to local or remote
* taskList.
*/
if (localTaskPlacementList == NIL)
{
*remoteTaskList = lappend(*remoteTaskList, task);
}
else
{
*localTaskList = lappend(*localTaskList, task);
}
}
else
{
Task *localTask = copyObject(task);
Task *remoteTask = NULL;
/*
* At this point, we're dealing with reference tables or intermediate results
* where the task has placements on both local and remote nodes. We always
* prefer to use local placement, and require remote placements only for
* modifications.
*/
localTask->taskPlacementList = localTaskPlacementList;
*localTaskList = lappend(*localTaskList, localTask);
if (readOnly)
{
/* read-only tasks should only be executed on the local machine */
}
else
{
remoteTask = copyObject(task);
remoteTask->taskPlacementList = remoteTaskPlacementList;
*remoteTaskList = lappend(*remoteTaskList, remoteTask);
}
}
}
}
/*
* SplitLocalAndRemotePlacements is a helper function which iterates over the input
* taskPlacementList and puts the placements into corresponding list of either
* localTaskPlacementList or remoteTaskPlacementList.
*/
static void
SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList,
List **remoteTaskPlacementList)
{
ListCell *placementCell = NULL;
int32 localGroupId = GetLocalGroupId();
*localTaskPlacementList = NIL;
*remoteTaskPlacementList = NIL;
foreach(placementCell, taskPlacementList)
{
ShardPlacement *taskPlacement =
(ShardPlacement *) lfirst(placementCell);
if (taskPlacement->groupId == localGroupId)
{
*localTaskPlacementList = lappend(*localTaskPlacementList, taskPlacement);
}
else
{
*remoteTaskPlacementList = lappend(*remoteTaskPlacementList, taskPlacement);
}
}
}
/*
* ExecuteLocalTaskPlan gets a planned statement which can be executed locally.
* The function simply follows the steps to have a local execution, sets the
* tupleStore if necessary. The function returns the
*/
static uint64
ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString)
{
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
DestReceiver *tupleStoreDestReceiever = CreateDestReceiver(DestTuplestore);
ScanDirection scanDirection = ForwardScanDirection;
QueryEnvironment *queryEnv = create_queryEnv();
QueryDesc *queryDesc = NULL;
int eflags = 0;
uint64 totalRowsProcessed = 0;
/*
* Use the tupleStore provided by the scanState because it is shared accross
* the other task executions and the adaptive executor.
*/
SetTuplestoreDestReceiverParams(tupleStoreDestReceiever,
scanState->tuplestorestate,
CurrentMemoryContext, false);
/* Create a QueryDesc for the query */
queryDesc = CreateQueryDesc(taskPlan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
tupleStoreDestReceiever, paramListInfo,
queryEnv, 0);
ExecutorStart(queryDesc, eflags);
ExecutorRun(queryDesc, scanDirection, 0L, true);
/*
* We'll set the executorState->es_processed later, for now only remember
* the count.
*/
if (taskPlan->commandType != CMD_SELECT)
{
totalRowsProcessed = queryDesc->estate->es_processed;
}
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
return totalRowsProcessed;
}
/*
* ShouldExecuteTasksLocally gets a task list and returns true if the
* any of the tasks should be executed locally. This function does not
* guarantee that any task have to be executed locally.
*/
bool
ShouldExecuteTasksLocally(List *taskList)
{
bool singleTask = false;
if (!EnableLocalExecution)
{
return false;
}
if (LocalExecutionHappened)
{
/*
* For various reasons, including the transaction visibility
* rules (e.g., read-your-own-writes), we have to use local
* execution again if it has already happened within this
* transaction block.
*
* We might error out later in the execution if it is not suitable
* to execute the tasks locally.
*/
Assert(IsMultiStatementTransaction() || InCoordinatedTransaction());
/*
* TODO: A future improvement could be to keep track of which placements
* have been locally executed. At this point, only use local execution for
* those placements. That'd help to benefit more from parallelism.
*/
return true;
}
singleTask = (list_length(taskList) == 1);
if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList)))
{
/*
* This is the valuable time to use the local execution. We are likely
* to avoid any network round-trips by simply executing the command within
* this session.
*
* We cannot avoid network round trips if the task is not a read only
* task and accesses multiple placements. For example, modifications to
* distributed tables (with replication factor == 1) would avoid network
* round-trips. However, modifications to reference tables still needs to
* go to over the network to do the modification on the other placements.
* Still, we'll be avoding the network round trip for this node.
*
* Note that we shouldn't use local execution if any distributed execution
* has happened because that'd break transaction visibility rules and
* many other things.
*/
return !AnyConnectionAccessedPlacements();
}
if (!singleTask)
{
/*
* For multi-task executions, switching to local execution would likely to
* perform poorly, because we'd lose the parallelizm. Note that the local
* execution is happening one task at a time (e.g., similar to sequential
* distributed execution).
*/
Assert(!LocalExecutionHappened);
return false;
}
return false;
}
/*
* TaskAccessesLocalNode returns true if any placements of the task reside on the
* node that we're executing the query.
*/
static bool
TaskAccessesLocalNode(Task *task)
{
ListCell *placementCell = NULL;
int localGroupId = GetLocalGroupId();
foreach(placementCell, task->taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(placementCell);
if (taskPlacement->groupId == localGroupId)
{
return true;
}
}
return false;
}
/*
* ErrorIfLocalExecutionHappened() errors out if a local query has already been executed
* in the same transaction.
*
* This check is required because Citus currently hasn't implemented local execution
* infrastructure for all the commands/executors. As we implement local execution for
* the command/executor that this function call exists, we should simply remove the check.
*/
void
ErrorIfLocalExecutionHappened(void)
{
if (LocalExecutionHappened)
{
ereport(ERROR, (errmsg("cannot execute command because a local execution has "
"already been done in the transaction"),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.enable_local_execution TO OFF;\""),
errdetail("Some parallel commands cannot be executed if a "
"previous command has already been executed locally")));
}
}
/*
* LogLocalCommand logs commands executed locally on this node. Although we're
* talking about local execution, the function relies on citus.log_remote_commands GUC.
* This makes sense because the local execution is still on a shard of a distributed table,
* meaning it is part of distributed execution.
*/
static void
LogLocalCommand(const char *command)
{
if (!(LogRemoteCommands || LogLocalCommands))
{
return;
}
ereport(LOG, (errmsg("executing the command locally: %s",
ApplyLogRedaction(command))));
}
/*
* DisableLocalExecution simply a C interface for
* setting the following:
* SET LOCAL citus.enable_local_execution TO off;
*/
void
DisableLocalExecution(void)
{
set_config_option("citus.enable_local_execution", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
}

View File

@ -397,7 +397,7 @@ void
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
DestReceiver *dest) DestReceiver *dest)
{ {
Query *query = ParseQueryString(queryString); Query *query = ParseQueryString(queryString, NULL, 0);
ExecuteQueryIntoDestReceiver(query, params, dest); ExecuteQueryIntoDestReceiver(query, params, dest);
} }
@ -407,11 +407,12 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params
* ParseQuery parses query string and returns a Query struct. * ParseQuery parses query string and returns a Query struct.
*/ */
Query * Query *
ParseQueryString(const char *queryString) ParseQueryString(const char *queryString, Oid *paramOids, int numParams)
{ {
Query *query = NULL; Query *query = NULL;
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL); List *queryTreeList =
pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL);
if (list_length(queryTreeList) != 1) if (list_length(queryTreeList) != 1)
{ {

View File

@ -26,6 +26,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
@ -1035,6 +1036,9 @@ RealTimeExecScan(CustomScanState *node)
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
ErrorIfLocalExecutionHappened();
DisableLocalExecution();
/* we are taking locks on partitions of partitioned tables */ /* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);

View File

@ -32,6 +32,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -588,6 +589,9 @@ RouterModifyExecScan(CustomScanState *node)
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
bool parallelExecution = true; bool parallelExecution = true;
ErrorIfLocalExecutionHappened();
DisableLocalExecution();
ExecuteSubPlans(distributedPlan); ExecuteSubPlans(distributedPlan);
if (list_length(taskList) <= 1 || if (list_length(taskList) <= 1 ||
@ -866,6 +870,9 @@ RouterSelectExecScan(CustomScanState *node)
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
ErrorIfLocalExecutionHappened();
DisableLocalExecution();
/* we are taking locks on partitions of partitioned tables */ /* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);

View File

@ -28,6 +28,7 @@
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -3008,6 +3009,9 @@ TaskTrackerExecScan(CustomScanState *node)
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery; Query *jobQuery = workerJob->jobQuery;
ErrorIfLocalExecutionHappened();
DisableLocalExecution();
if (ContainsReadIntermediateResultFunction((Node *) jobQuery)) if (ContainsReadIntermediateResultFunction((Node *) jobQuery))
{ {
ereport(ERROR, (errmsg("Complex subqueries and CTEs are not supported when " ereport(ERROR, (errmsg("Complex subqueries and CTEs are not supported when "

View File

@ -26,6 +26,7 @@
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/distributed_deadlock_detection.h" #include "distributed/distributed_deadlock_detection.h"
#include "distributed/local_executor.h"
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
@ -430,6 +431,17 @@ RegisterCitusConfigVariables(void)
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_local_execution",
gettext_noop("Enables queries on shards that are local to the current node "
"to be planned and executed locally."),
NULL,
&EnableLocalExecution,
true,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_single_hash_repartition_joins", "citus.enable_single_hash_repartition_joins",
gettext_noop("Enables single hash repartitioning between hash " gettext_noop("Enables single hash repartitioning between hash "
@ -509,6 +521,17 @@ RegisterCitusConfigVariables(void)
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.log_local_commands",
gettext_noop("Log queries that are executed locally, can be overriden by "
"citus.log_remote_commands"),
NULL,
&LogLocalCommands,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.log_distributed_deadlock_detection", "citus.log_distributed_deadlock_detection",
gettext_noop("Log distributed deadlock detection related processing in " gettext_noop("Log distributed deadlock detection related processing in "

View File

@ -23,6 +23,8 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/intermediate_results.h" #include "distributed/intermediate_results.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
@ -225,6 +227,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CurrentCoordinatedTransactionState = COORD_TRANS_NONE; CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationLevel = XACT_MODIFICATION_NONE;
LocalExecutionHappened = false;
dlist_init(&InProgressTransactions); dlist_init(&InProgressTransactions);
activeSetStmts = NULL; activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false; CoordinatedTransactionUses2PC = false;
@ -278,6 +281,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CurrentCoordinatedTransactionState = COORD_TRANS_NONE; CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationLevel = XACT_MODIFICATION_NONE;
LocalExecutionHappened = false;
dlist_init(&InProgressTransactions); dlist_init(&InProgressTransactions);
activeSetStmts = NULL; activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false; CoordinatedTransactionUses2PC = false;

View File

@ -85,7 +85,7 @@ worker_execute_sql_task(PG_FUNCTION_ARGS)
StringInfo jobDirectoryName = JobDirectoryName(jobId); StringInfo jobDirectoryName = JobDirectoryName(jobId);
StringInfo taskFilename = UserTaskFilename(jobDirectoryName, taskId); StringInfo taskFilename = UserTaskFilename(jobDirectoryName, taskId);
query = ParseQueryString(queryString); query = ParseQueryString(queryString, NULL, 0);
tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat); tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat);
PG_RETURN_INT64(tuplesSent); PG_RETURN_INT64(tuplesSent);

View File

@ -0,0 +1,30 @@
/*-------------------------------------------------------------------------
*
* local_executor.h
* Functions and global variables to control local query execution.
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef LOCAL_EXECUTION_H
#define LOCAL_EXECUTION_H
#include "distributed/citus_custom_scan.h"
/* enabled with GUCs*/
extern bool EnableLocalExecution;
extern bool LogLocalCommands;
extern bool LocalExecutionHappened;
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList);
extern bool ShouldExecuteTasksLocally(List *taskList);
extern void ErrorIfLocalExecutionHappened(void);
extern void DisableLocalExecution(void);
extern bool AnyTaskAccessesRemoteNode(List *taskList);
#endif /* LOCAL_EXECUTION_H */

View File

@ -50,7 +50,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
tupleDescriptor, Tuplestorestate *tupstore); tupleDescriptor, Tuplestorestate *tupstore);
extern Query * ParseQueryString(const char *queryString); extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams);
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
params, params,
DestReceiver *dest); DestReceiver *dest);
@ -61,6 +61,7 @@ extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo pa
extern void SetLocalMultiShardModifyModeToSequential(void); extern void SetLocalMultiShardModifyModeToSequential(void);
extern void SetLocalForceMaxQueryParallelization(void); extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState); extern void SortTupleStore(CitusScanState *scanState);
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
#endif /* MULTI_EXECUTOR_H */ #endif /* MULTI_EXECUTOR_H */

View File

@ -67,6 +67,7 @@ extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
extern void ResetShardPlacementAssociation(struct MultiConnection *connection); extern void ResetShardPlacementAssociation(struct MultiConnection *connection);
extern void InitPlacementConnectionManagement(void); extern void InitPlacementConnectionManagement(void);
extern bool AnyConnectionAccessedPlacements(void);
extern bool ConnectionModifiedPlacement(MultiConnection *connection); extern bool ConnectionModifiedPlacement(MultiConnection *connection);
extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection); extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection);

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,7 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32744;
-- now singe-row INSERT to the other worker -- now singe-row INSERT to the other worker
\c - - - :worker_2_port \c - - - :worker_2_port
\set VERBOSITY terse
INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69); 20.69);
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745; SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745;
@ -77,22 +78,15 @@ ERROR: cannot perform an INSERT with NULL in the partition column
INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
-5.00); -5.00);
ERROR: new row for relation "limit_orders_mx_1220092" violates check constraint "limit_orders_mx_limit_price_check" ERROR: new row for relation "limit_orders_mx_1220092" violates check constraint "limit_orders_mx_limit_price_check"
DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00).
CONTEXT: while executing command on localhost:57637
-- INSERT violating primary key constraint -- INSERT violating primary key constraint
INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58); INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093" ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
DETAIL: Key (id)=(32743) already exists.
CONTEXT: while executing command on localhost:57638
-- INSERT violating primary key constraint, with RETURNING specified. -- INSERT violating primary key constraint, with RETURNING specified.
INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *; INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *;
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093" ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
DETAIL: Key (id)=(32743) already exists.
CONTEXT: while executing command on localhost:57638
-- INSERT, with RETURNING specified, failing with a non-constraint error -- INSERT, with RETURNING specified, failing with a non-constraint error
INSERT INTO limit_orders_mx VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; INSERT INTO limit_orders_mx VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
ERROR: division by zero ERROR: division by zero
CONTEXT: while executing command on localhost:57638
SET client_min_messages TO DEFAULT; SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported -- commands with non-constant partition values are unsupported
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
@ -221,8 +215,6 @@ UPDATE limit_orders_mx SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RET
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093" ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
DETAIL: Key (id)=(275) already exists.
CONTEXT: while executing command on localhost:57638
-- multi shard update is supported -- multi shard update is supported
UPDATE limit_orders_mx SET limit_price = 0.00; UPDATE limit_orders_mx SET limit_price = 0.00;
-- attempting to change the partition key is unsupported -- attempting to change the partition key is unsupported
@ -294,8 +286,6 @@ CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; 'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
UPDATE limit_orders_mx SET bidder_id = temp_strict_func(1, null) WHERE id = 246; UPDATE limit_orders_mx SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
ERROR: null value in column "bidder_id" violates not-null constraint ERROR: null value in column "bidder_id" violates not-null constraint
DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 0.00, {1,2}).
CONTEXT: while executing command on localhost:57637
SELECT array_of_values FROM limit_orders_mx WHERE id = 246; SELECT array_of_values FROM limit_orders_mx WHERE id = 246;
array_of_values array_of_values
----------------- -----------------

View File

@ -137,6 +137,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
-- and the other way around is also allowed -- and the other way around is also allowed
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
COMMIT; COMMIT;
@ -158,6 +159,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
-- and the other way around is also allowed -- and the other way around is also allowed
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
COMMIT; COMMIT;
@ -166,6 +168,7 @@ COMMIT;
-- this logic doesn't apply to router SELECTs occurring after a modification: -- this logic doesn't apply to router SELECTs occurring after a modification:
-- selecting from the modified node is fine... -- selecting from the modified node is fine...
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
SELECT count(*) FROM researchers_mx WHERE lab_id = 6; SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
count count
@ -176,6 +179,7 @@ SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
ABORT; ABORT;
-- doesn't apply to COPY after modifications -- doesn't apply to COPY after modifications
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
\copy labs_mx from stdin delimiter ',' \copy labs_mx from stdin delimiter ','
COMMIT; COMMIT;
@ -198,7 +202,6 @@ INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (1, 'orange'); INSERT INTO objects_mx VALUES (1, 'orange');
ERROR: duplicate key value violates unique constraint "objects_mx_pkey_1220103" ERROR: duplicate key value violates unique constraint "objects_mx_pkey_1220103"
DETAIL: Key (id)=(1) already exists. DETAIL: Key (id)=(1) already exists.
CONTEXT: while executing command on localhost:57637
COMMIT; COMMIT;
-- data shouldn't have persisted... -- data shouldn't have persisted...
SELECT * FROM objects_mx WHERE id = 1; SELECT * FROM objects_mx WHERE id = 1;
@ -282,6 +285,7 @@ AFTER INSERT ON labs_mx_1220102
DEFERRABLE INITIALLY IMMEDIATE DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx(); FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (2, 'BAD'); INSERT INTO objects_mx VALUES (2, 'BAD');
ERROR: illegal value ERROR: illegal value
@ -332,6 +336,7 @@ DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx(); FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
-- should be the same story as before, just at COMMIT time -- should be the same story as before, just at COMMIT time
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (2, 'BAD'); INSERT INTO objects_mx VALUES (2, 'BAD');
INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation'); INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
@ -358,6 +363,7 @@ AFTER INSERT ON labs_mx_1220102
DEFERRABLE INITIALLY DEFERRED DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx(); FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (2, 'BAD'); INSERT INTO objects_mx VALUES (2, 'BAD');
INSERT INTO labs_mx VALUES (8, 'Aperture Science'); INSERT INTO labs_mx VALUES (8, 'Aperture Science');
@ -383,6 +389,7 @@ SELECT * FROM labs_mx WHERE id = 8;
\c - - - :worker_1_port \c - - - :worker_1_port
DROP TRIGGER reject_bad_mx ON objects_mx_1220103; DROP TRIGGER reject_bad_mx ON objects_mx_1220103;
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO labs_mx VALUES (8, 'Aperture Science'); INSERT INTO labs_mx VALUES (8, 'Aperture Science');
INSERT INTO labs_mx VALUES (9, 'BAD'); INSERT INTO labs_mx VALUES (9, 'BAD');

View File

@ -101,6 +101,7 @@ SELECT count(*) FROM pg_dist_transaction;
-- Multi-statement transactions should write 2 transaction recovery records -- Multi-statement transactions should write 2 transaction recovery records
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO false;
INSERT INTO test_recovery VALUES ('hello'); INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world'); INSERT INTO test_recovery VALUES ('world');
COMMIT; COMMIT;
@ -116,6 +117,26 @@ SELECT recover_prepared_transactions();
0 0
(1 row) (1 row)
-- the same transaction block, but this time
-- enable local execution as well. The first
-- command is locally executed, the second
-- is remote, so 1 entry is expected
BEGIN;
INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records -- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
SELECT count(*) FROM pg_dist_transaction; SELECT count(*) FROM pg_dist_transaction;

View File

@ -31,7 +31,7 @@ test: recursive_dml_queries_mx multi_mx_truncate_from_worker
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata test: multi_mx_metadata
test: multi_mx_modifications test: multi_mx_modifications local_shard_execution
test: multi_mx_transaction_recovery test: multi_mx_transaction_recovery
test: multi_mx_modifying_xacts test: multi_mx_modifying_xacts
test: multi_mx_explain test: multi_mx_explain

View File

@ -0,0 +1,670 @@
CREATE SCHEMA local_shard_execution;
SET search_path TO local_shard_execution;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
SET citus.next_shard_id TO 1470000;
CREATE TABLE reference_table (key int PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table (key int PRIMARY KEY , value text, age bigint CHECK (age > 10), FOREIGN KEY (key) REFERENCES reference_table(key) ON DELETE CASCADE);
SELECT create_distributed_table('distributed_table','key');
CREATE TABLE second_distributed_table (key int PRIMARY KEY , value text, FOREIGN KEY (key) REFERENCES distributed_table(key) ON DELETE CASCADE);
SELECT create_distributed_table('second_distributed_table','key');
-- ingest some data to enable some tests with data
INSERT INTO reference_table VALUES (1);
INSERT INTO distributed_table VALUES (1, '1', 20);
INSERT INTO second_distributed_table VALUES (1, '1');
-- a simple test for
CREATE TABLE collections_list (
key int,
ts timestamptz,
collection_id integer,
value numeric,
PRIMARY KEY(key, collection_id)
) PARTITION BY LIST (collection_id );
SELECT create_distributed_table('collections_list', 'key');
CREATE TABLE collections_list_0
PARTITION OF collections_list (key, ts, collection_id, value)
FOR VALUES IN ( 0 );
-- connection worker and get ready for the tests
\c - - - :worker_1_port
SET search_path TO local_shard_execution;
-- returns true of the distribution key filter
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
-- placement which is local to this not
CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
DECLARE shard_is_local BOOLEAN := FALSE;
BEGIN
WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_execution.distributed_table', dist_key)),
all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
SELECT
true INTO shard_is_local
FROM
local_shard_ids
WHERE
get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
IF shard_is_local IS NULL THEN
shard_is_local = FALSE;
END IF;
RETURN shard_is_local;
END;
$$ LANGUAGE plpgsql;
-- pick some example values that reside on the shards locally and remote
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
-- we'll use these values in the tests
SELECT shard_of_distribution_column_is_local(1);
SELECT shard_of_distribution_column_is_local(6);
SELECT shard_of_distribution_column_is_local(500);
SELECT shard_of_distribution_column_is_local(701);
-- distribution key values of 11 and 12 are REMOTE to shards
SELECT shard_of_distribution_column_is_local(11);
SELECT shard_of_distribution_column_is_local(12);
--- enable logging to see which tasks are executed locally
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
-- first, make sure that local execution works fine
-- with simple queries that are not in transcation blocks
SELECT count(*) FROM distributed_table WHERE key = 1;
-- multiple tasks both of which are local should NOT use local execution
-- because local execution means executing the tasks locally, so the executor
-- favors parallel execution even if everyting is local to node
SELECT count(*) FROM distributed_table WHERE key IN (1,6);
-- queries that hit any remote shards should NOT use local execution
SELECT count(*) FROM distributed_table WHERE key IN (1,11);
SELECT count(*) FROM distributed_table;
-- modifications also follow the same rules
INSERT INTO reference_table VALUES (1) ON CONFLICT DO NOTHING;
INSERT INTO distributed_table VALUES (1, '1', 21) ON CONFLICT DO NOTHING;
-- local query
DELETE FROM distributed_table WHERE key = 1 AND age = 21;
-- hitting multiple shards, so should be a distributed execution
DELETE FROM distributed_table WHERE age = 21;
-- although both shards are local, the executor choose the parallel execution
-- over the wire because as noted above local execution is sequential
DELETE FROM second_distributed_table WHERE key IN (1,6);
-- similarly, any multi-shard query just follows distributed execution
DELETE FROM second_distributed_table;
-- load some more data for the following tests
INSERT INTO second_distributed_table VALUES (1, '1');
-- INSERT .. SELECT hitting a single single (co-located) shard(s) should
-- be executed locally
INSERT INTO distributed_table
SELECT
distributed_table.*
FROM
distributed_table, second_distributed_table
WHERE
distributed_table.key = 1 and distributed_table.key=second_distributed_table.key
ON CONFLICT(key) DO UPDATE SET value = '22'
RETURNING *;
-- INSERT .. SELECT hitting multi-shards should go thourgh distributed execution
INSERT INTO distributed_table
SELECT
distributed_table.*
FROM
distributed_table, second_distributed_table
WHERE
distributed_table.key != 1 and distributed_table.key=second_distributed_table.key
ON CONFLICT(key) DO UPDATE SET value = '22'
RETURNING *;
-- INSERT..SELECT via coordinator consists of two steps, select + COPY
-- that's why it is disallowed to use local execution even if the SELECT
-- can be executed locally
INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING;
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING;
-- EXPLAIN for local execution just works fine
-- though going through distributed execution
EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
-- TODO: Fix #2922
-- EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
-- TODO: Fix #2922
-- EXPLAIN ANALYZE DELETE FROM distributed_table WHERE key = 1 AND age = 20;
-- show that EXPLAIN ANALYZE deleted the row
SELECT * FROM distributed_table WHERE key = 1 AND age = 20 ORDER BY 1,2,3;
-- copy always happens via distributed execution irrespective of the
-- shards that are accessed
COPY reference_table FROM STDIN;
6
11
\.
COPY distributed_table FROM STDIN WITH CSV;
6,'6',25
11,'11',121
\.
COPY second_distributed_table FROM STDIN WITH CSV;
6,'6'
\.
-- the behaviour in transaction blocks is the following:
-- (a) Unless the first query is a local query, always use distributed execution.
-- (b) If the executor has used local execution, it has to use local execution
-- for the remaining of the transaction block. If that's not possible, the
-- executor has to error out (e.g., TRUNCATE is a utility command and we
-- currently do not support local execution of utility commands)
-- rollback should be able to rollback local execution
BEGIN;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
ROLLBACK;
-- make sure that the value is rollbacked
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
-- rollback should be able to rollback both the local and distributed executions
BEGIN;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
DELETE FROM distributed_table;
-- DELETE should cascade, and we should not see any rows
SELECT count(*) FROM second_distributed_table;
ROLLBACK;
-- make sure that everything is rollbacked
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
SELECT count(*) FROM second_distributed_table;
-- very simple examples, an SELECTs should see the modifications
-- that has done before
BEGIN;
-- INSERT is executed locally
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '23' RETURNING *;
-- since the INSERT is executed locally, the SELECT should also be
-- executed locally and see the changes
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
-- multi-shard SELECTs are now forced to use local execution on
-- the shards that reside on this node
SELECT * FROM distributed_table WHERE value = '23' ORDER BY 1,2,3;
-- similarly, multi-shard modifications should use local exection
-- on the shards that reside on this node
DELETE FROM distributed_table WHERE value = '23';
-- make sure that the value is deleted
SELECT * FROM distributed_table WHERE value = '23' ORDER BY 1,2,3;
COMMIT;
-- make sure that we've committed everything
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
-- if we start with a distributed execution, we should keep
-- using that and never switch back to local execution
BEGIN;
DELETE FROM distributed_table WHERE value = '11';
-- although this command could have been executed
-- locally, it is not going to be executed locally
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
-- but we can still execute parallel queries, even if
-- they are utility commands
TRUNCATE distributed_table CASCADE;
-- TRUNCATE cascaded into second_distributed_table
SELECT count(*) FROM second_distributed_table;
ROLLBACK;
-- load some data so that foreign keys won't complain with the next tests
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
-- a very similar set of operation, but this time use
-- COPY as the first command
BEGIN;
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
-- this could go through local execution, but doesn't because we've already
-- done distributed execution
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
-- utility commands could still use distributed execution
TRUNCATE distributed_table CASCADE;
-- ensure that TRUNCATE made it
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
ROLLBACK;
-- show that cascading foreign keys just works fine with local execution
BEGIN;
INSERT INTO reference_table VALUES (701);
INSERT INTO distributed_table VALUES (701, '701', 701);
INSERT INTO second_distributed_table VALUES (701, '701');
DELETE FROM reference_table WHERE key = 701;
SELECT count(*) FROM distributed_table WHERE key = 701;
SELECT count(*) FROM second_distributed_table WHERE key = 701;
-- multi-shard commands should also see the changes
SELECT count(*) FROM distributed_table WHERE key > 700;
-- we can still do multi-shard commands
DELETE FROM distributed_table;
ROLLBACK;
-- multiple queries hitting different shards can be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
SELECT count(*) FROM distributed_table WHERE key = 6;
SELECT count(*) FROM distributed_table WHERE key = 500;
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
TRUNCATE distributed_table CASCADE;
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
-- even no need to supply any data
COPY distributed_table FROM STDIN WITH CSV;
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i;
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
INSERT INTO distributed_table (key) SELECT key+1 FROM distributed_table;
ROLLBACK;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
BEGIN;
DELETE FROM distributed_table WHERE key = 1;
EXPLAIN ANALYZE DELETE FROM distributed_table WHERE key = 1;
ROLLBACK;
BEGIN;
INSERT INTO distributed_table VALUES (11, '111',29) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
-- this is already disallowed on the nodes, adding it in case we
-- support DDLs from the worker nodes in the future
ALTER TABLE distributed_table ADD COLUMN x INT;
ROLLBACK;
BEGIN;
INSERT INTO distributed_table VALUES (11, '111',29) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
-- this is already disallowed because VACUUM cannot be executed in tx block
-- adding in case this is supported some day
VACUUM second_distributed_table;
ROLLBACK;
-- make sure that functions can use local execution
CREATE OR REPLACE PROCEDURE only_local_execution() AS $$
DECLARE cnt INT;
BEGIN
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
SELECT count(*) INTO cnt FROM distributed_table WHERE key = 1;
DELETE FROM distributed_table WHERE key = 1;
END;
$$ LANGUAGE plpgsql;
CALL only_local_execution();
CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
DECLARE cnt INT;
BEGIN
INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
SELECT count(*) INTO cnt FROM distributed_table WHERE key = $1;
DELETE FROM distributed_table WHERE key = $1;
END;
$$ LANGUAGE plpgsql;
CALL only_local_execution_with_params(1);
CREATE OR REPLACE PROCEDURE local_execution_followed_by_dist() AS $$
DECLARE cnt INT;
BEGIN
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
SELECT count(*) INTO cnt FROM distributed_table WHERE key = 1;
DELETE FROM distributed_table;
SELECT count(*) INTO cnt FROM distributed_table;
END;
$$ LANGUAGE plpgsql;
CALL local_execution_followed_by_dist();
-- test CTEs, including modifying CTEs
WITH local_insert AS (INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *),
distributed_local_mixed AS (SELECT * FROM reference_table WHERE key IN (SELECT key FROM local_insert))
SELECT * FROM local_insert, distributed_local_mixed;
-- since we start with parallel execution, we do not switch back to local execution in the
-- latter CTEs
WITH distributed_local_mixed AS (SELECT * FROM distributed_table),
local_insert AS (INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *)
SELECT * FROM local_insert, distributed_local_mixed ORDER BY 1,2,3,4,5;
-- router CTE pushdown
WITH all_data AS (SELECT * FROM distributed_table WHERE key = 1)
SELECT
count(*)
FROM
distributed_table, all_data
WHERE
distributed_table.key = all_data.key AND distributed_table.key = 1;
INSERT INTO reference_table VALUES (2);
INSERT INTO distributed_table VALUES (2, '29', 29);
INSERT INTO second_distributed_table VALUES (2, '29');
-- single shard that is not a local query followed by a local query
WITH all_data AS (SELECT * FROM second_distributed_table WHERE key = 2)
SELECT
distributed_table.key
FROM
distributed_table, all_data
WHERE
distributed_table.value = all_data.value AND distributed_table.key = 1
ORDER BY
1 DESC;
-- multi-shard CTE is followed by a query which could be executed locally, but
-- since the query started with a parallel query, it doesn't use local execution
WITH all_data AS (SELECT * FROM distributed_table)
SELECT
count(*)
FROM
distributed_table, all_data
WHERE
distributed_table.key = all_data.key AND distributed_table.key = 1;
-- get ready for the next commands
TRUNCATE reference_table, distributed_table, second_distributed_table;
-- local execution of returning of reference tables
INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *;
-- local execution of multi-row INSERTs
INSERT INTO distributed_table VALUES (1, '11',21), (5,'55',22) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
-- distributed execution of multi-rows INSERTs, where some part of the execution
-- could have been done via local execution but the executor choose the other way around
-- because the command is a multi-shard query
INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
PREPARE local_prepare_no_param AS SELECT count(*) FROM distributed_table WHERE key = 1;
PREPARE local_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key = $1;
PREPARE remote_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key != $1;
BEGIN;
-- 6 local execution without params
EXECUTE local_prepare_no_param;
EXECUTE local_prepare_no_param;
EXECUTE local_prepare_no_param;
EXECUTE local_prepare_no_param;
EXECUTE local_prepare_no_param;
EXECUTE local_prepare_no_param;
-- 6 local executions with params
EXECUTE local_prepare_param(1);
EXECUTE local_prepare_param(5);
EXECUTE local_prepare_param(6);
EXECUTE local_prepare_param(1);
EXECUTE local_prepare_param(5);
EXECUTE local_prepare_param(6);
-- followed by a non-local execution
EXECUTE remote_prepare_param(1);
COMMIT;
-- failures of local execution should rollback both the
-- local execution and remote executions
-- fail on a local execution
BEGIN;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '100' RETURNING *;
UPDATE distributed_table SET value = '200';
INSERT INTO distributed_table VALUES (1, '100',21) ON CONFLICT(key) DO UPDATE SET value = (1 / (100.0 - EXCLUDED.value::int))::text RETURNING *;
ROLLBACK;
-- we've rollbacked everything
SELECT count(*) FROM distributed_table WHERE value = '200';
-- RETURNING should just work fine for reference tables
INSERT INTO reference_table VALUES (500) RETURNING *;
DELETE FROM reference_table WHERE key = 500 RETURNING *;
-- should be able to skip local execution even if in a sequential mode of execution
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential ;
DELETE FROM distributed_table;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '100' RETURNING *;
ROLLBACK;
-- sequential execution should just work fine after a local execution
BEGIN;
SET citus.multi_shard_modify_mode TO sequential ;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '100' RETURNING *;
DELETE FROM distributed_table;
ROLLBACK;
-- load some data so that foreign keys won't complain with the next tests
TRUNCATE reference_table CASCADE;
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
-- show that both local, and mixed local-distributed executions
-- calculate rows processed correctly
BEGIN;
DELETE FROM distributed_table WHERE key = 500;
DELETE FROM distributed_table WHERE value != '123123213123213';
ROLLBACK;
BEGIN;
DELETE FROM reference_table WHERE key = 500 RETURNING *;
DELETE FROM reference_table;
ROLLBACK;
-- mix with other executors should fail
-- router modify execution should error
BEGIN;
DELETE FROM distributed_table WHERE key = 500;
SET LOCAL citus.task_executor_type = 'real-time';
DELETE FROM distributed_table;
ROLLBACK;
-- local execution should not be executed locally
-- becase a multi-shard router query has already been executed
BEGIN;
SET LOCAL citus.task_executor_type = 'real-time';
DELETE FROM distributed_table;
DELETE FROM distributed_table WHERE key = 500;
ROLLBACK;
-- router select execution
BEGIN;
DELETE FROM distributed_table WHERE key = 500;
SET LOCAL citus.task_executor_type = 'real-time';
SELECT count(*) FROM distributed_table WHERE key = 500;
ROLLBACK;
-- local execution should not be executed locally
-- becase a single-shard router query has already been executed
BEGIN;
SET LOCAL citus.task_executor_type = 'real-time';
SELECT count(*) FROM distributed_table WHERE key = 500;
DELETE FROM distributed_table WHERE key = 500;
ROLLBACK;
-- real-time select execution
BEGIN;
DELETE FROM distributed_table WHERE key = 500;
SET LOCAL citus.task_executor_type = 'real-time';
SELECT count(*) FROM distributed_table;
ROLLBACK;
-- local execution should not be executed locally
-- becase a real-time query has already been executed
BEGIN;
SET LOCAL citus.task_executor_type = 'real-time';
SELECT count(*) FROM distributed_table;
DELETE FROM distributed_table WHERE key = 500;
ROLLBACK;
-- task-tracker select execution
BEGIN;
DELETE FROM distributed_table WHERE key = 500;
SET LOCAL citus.task_executor_type = 'task-tracker';
SELECT count(*) FROM distributed_table;
ROLLBACK;
-- local execution should not be executed locally
-- becase a task-tracker query has already been executed
BEGIN;
SET LOCAL citus.task_executor_type = 'task-tracker';
SET LOCAL client_min_messages TO INFO;
SELECT count(*) FROM distributed_table;
SET LOCAL client_min_messages TO LOG;
DELETE FROM distributed_table WHERE key = 500;
ROLLBACK;
-- probably not a realistic case since views are not very
-- well supported with MX
CREATE VIEW v_local_query_execution AS
SELECT * FROM distributed_table WHERE key = 500;
SELECT * FROM v_local_query_execution;
-- similar test, but this time the view itself is a non-local
-- query, but the query on the view is local
CREATE VIEW v_local_query_execution_2 AS
SELECT * FROM distributed_table;
SELECT * FROM v_local_query_execution_2 WHERE key = 500;
-- even if we switch from remote execution -> local execution,
-- we are able to use remote execution after rollback
BEGIN;
SAVEPOINT my_savepoint;
SELECT count(*) FROM distributed_table;
DELETE FROM distributed_table WHERE key = 500;
ROLLBACK TO SAVEPOINT my_savepoint;
DELETE FROM distributed_table WHERE key = 500;
COMMIT;
-- even if we switch from local execution -> remote execution,
-- we are able to use local execution after rollback
BEGIN;
SAVEPOINT my_savepoint;
DELETE FROM distributed_table WHERE key = 500;
SELECT count(*) FROM distributed_table;
ROLLBACK TO SAVEPOINT my_savepoint;
DELETE FROM distributed_table WHERE key = 500;
COMMIT;
-- sanity check: local execution on partitions
BEGIN;
INSERT INTO collections_list (key, collection_id) VALUES (1,0);
SELECT count(*) FROM collections_list_0 WHERE key = 1;
SELECT count(*) FROM collections_list;
COMMIT;
-- the final queries for the following CTEs are going to happen on the intermediate results only
-- one of them will be executed remotely, and the other is locally
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed;
\c - - - :master_port
SET client_min_messages TO ERROR;
SET search_path TO public;
DROP SCHEMA local_shard_execution CASCADE;

View File

@ -19,6 +19,8 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32744;
-- now singe-row INSERT to the other worker -- now singe-row INSERT to the other worker
\c - - - :worker_2_port \c - - - :worker_2_port
\set VERBOSITY terse
INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69); 20.69);
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745; SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745;

View File

@ -120,6 +120,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
-- and the other way around is also allowed -- and the other way around is also allowed
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
COMMIT; COMMIT;
@ -136,6 +137,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
-- and the other way around is also allowed -- and the other way around is also allowed
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
COMMIT; COMMIT;
@ -146,12 +148,14 @@ COMMIT;
-- this logic doesn't apply to router SELECTs occurring after a modification: -- this logic doesn't apply to router SELECTs occurring after a modification:
-- selecting from the modified node is fine... -- selecting from the modified node is fine...
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
SELECT count(*) FROM researchers_mx WHERE lab_id = 6; SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
ABORT; ABORT;
-- doesn't apply to COPY after modifications -- doesn't apply to COPY after modifications
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
\copy labs_mx from stdin delimiter ',' \copy labs_mx from stdin delimiter ','
10,Weyland-Yutani-1 10,Weyland-Yutani-1
@ -241,6 +245,7 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx(); FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (2, 'BAD'); INSERT INTO objects_mx VALUES (2, 'BAD');
INSERT INTO labs_mx VALUES (8, 'Aperture Science'); INSERT INTO labs_mx VALUES (8, 'Aperture Science');
@ -279,6 +284,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
-- should be the same story as before, just at COMMIT time -- should be the same story as before, just at COMMIT time
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (2, 'BAD'); INSERT INTO objects_mx VALUES (2, 'BAD');
INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation'); INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
@ -297,6 +303,7 @@ DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx(); FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO objects_mx VALUES (2, 'BAD'); INSERT INTO objects_mx VALUES (2, 'BAD');
INSERT INTO labs_mx VALUES (8, 'Aperture Science'); INSERT INTO labs_mx VALUES (8, 'Aperture Science');
@ -313,6 +320,7 @@ SELECT * FROM labs_mx WHERE id = 8;
DROP TRIGGER reject_bad_mx ON objects_mx_1220103; DROP TRIGGER reject_bad_mx ON objects_mx_1220103;
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO off;
INSERT INTO objects_mx VALUES (1, 'apple'); INSERT INTO objects_mx VALUES (1, 'apple');
INSERT INTO labs_mx VALUES (8, 'Aperture Science'); INSERT INTO labs_mx VALUES (8, 'Aperture Science');
INSERT INTO labs_mx VALUES (9, 'BAD'); INSERT INTO labs_mx VALUES (9, 'BAD');

View File

@ -64,6 +64,18 @@ SELECT count(*) FROM pg_dist_transaction;
-- Multi-statement transactions should write 2 transaction recovery records -- Multi-statement transactions should write 2 transaction recovery records
BEGIN; BEGIN;
SET LOCAL citus.enable_local_execution TO false;
INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- the same transaction block, but this time
-- enable local execution as well. The first
-- command is locally executed, the second
-- is remote, so 1 entry is expected
BEGIN;
INSERT INTO test_recovery VALUES ('hello'); INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world'); INSERT INTO test_recovery VALUES ('world');
COMMIT; COMMIT;