mirror of https://github.com/citusdata/citus.git
Introduce the concept of Local Execution
/* * local_executor.c * * The scope of the local execution is locally executing the queries on the * shards. In other words, local execution does not deal with any local tables * that are not shards on the node that the query is being executed. In that sense, * the local executor is only triggered if the node has both the metadata and the * shards (e.g., only Citus MX worker nodes). * * The goal of the local execution is to skip the unnecessary network round-trip * happening on the node itself. Instead, identify the locally executable tasks and * simply call PostgreSQL's planner and executor. * * The local executor is an extension of the adaptive executor. So, the executor uses * adaptive executor's custom scan nodes. * * One thing to note that Citus MX is only supported with replication factor = 1, so * keep that in mind while continuing the comments below. * * On the high level, there are 3 slightly different ways of utilizing local execution: * * (1) Execution of local single shard queries of a distributed table * * This is the simplest case. The executor kicks at the start of the adaptive * executor, and since the query is only a single task the execution finishes * without going to the network at all. * * Even if there is a transaction block (or recursively planned CTEs), as long * as the queries hit the shards on the same, the local execution will kick in. * * (2) Execution of local single queries and remote multi-shard queries * * The rule is simple. If a transaction block starts with a local query execution, * all the other queries in the same transaction block that touch any local shard * have to use the local execution. Although this sounds restrictive, we prefer to * implement in this way, otherwise we'd end-up with as complex scenarious as we * have in the connection managements due to foreign keys. * * See the following example: * BEGIN; * -- assume that the query is executed locally * SELECT count(*) FROM test WHERE key = 1; * * -- at this point, all the shards that reside on the * -- node is executed locally one-by-one. After those finishes * -- the remaining tasks are handled by adaptive executor * SELECT count(*) FROM test; * * * (3) Modifications of reference tables * * Modifications to reference tables have to be executed on all nodes. So, after the * local execution, the adaptive executor keeps continuing the execution on the other * nodes. * * Note that for read-only queries, after the local execution, there is no need to * kick in adaptive executor. * * There are also few limitations/trade-offs that is worth mentioning. First, the * local execution on multiple shards might be slow because the execution has to * happen one task at a time (e.g., no parallelism). Second, if a transaction * block/CTE starts with a multi-shard command, we do not use local query execution * since local execution is sequential. Basically, we do not want to lose parallelism * across local tasks by switching to local execution. Third, the local execution * currently only supports queries. In other words, any utility commands like TRUNCATE, * fails if the command is executed after a local execution inside a transaction block. * Forth, the local execution cannot be mixed with the executors other than adaptive, * namely task-tracker, real-time and router executors. Finally, related with the * previous item, COPY command cannot be mixed with local execution in a transaction. * The implication of that any part of INSERT..SELECT via coordinator cannot happen * via the local execution. */pull/2938/head
parent
d69be38932
commit
0b0c779c77
|
@ -71,6 +71,7 @@
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
|
@ -2233,6 +2234,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
const char *delimiterCharacter = "\t";
|
const char *delimiterCharacter = "\t";
|
||||||
const char *nullPrintCharacter = "\\N";
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
|
/* Citus currently doesn't know how to handle COPY command locally */
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
|
||||||
/* look up table properties */
|
/* look up table properties */
|
||||||
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
||||||
cacheEntry = DistributedTableCacheEntry(tableId);
|
cacheEntry = DistributedTableCacheEntry(tableId);
|
||||||
|
|
|
@ -31,14 +31,17 @@
|
||||||
#include "access/attnum.h"
|
#include "access/attnum.h"
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
|
#include "access/xact.h"
|
||||||
#include "catalog/catalog.h"
|
#include "catalog/catalog.h"
|
||||||
#include "catalog/dependency.h"
|
#include "catalog/dependency.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
|
#include "commands/defrem.h"
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
|
#include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -140,6 +143,49 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsA(parsetree, ExplainStmt) &&
|
||||||
|
IsA(((ExplainStmt *) parsetree)->query, Query))
|
||||||
|
{
|
||||||
|
ExplainStmt *explainStmt = (ExplainStmt *) parsetree;
|
||||||
|
|
||||||
|
if (IsTransactionBlock())
|
||||||
|
{
|
||||||
|
ListCell *optionCell = NULL;
|
||||||
|
bool analyze = false;
|
||||||
|
|
||||||
|
foreach(optionCell, explainStmt->options)
|
||||||
|
{
|
||||||
|
DefElem *option = (DefElem *) lfirst(optionCell);
|
||||||
|
|
||||||
|
if (strcmp(option->defname, "analyze") == 0)
|
||||||
|
{
|
||||||
|
analyze = defGetBoolean(option);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* don't "break", as explain.c will use the last value */
|
||||||
|
}
|
||||||
|
|
||||||
|
if (analyze)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Since we cannot execute EXPLAIN ANALYZE locally, we
|
||||||
|
* cannot continue.
|
||||||
|
*/
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EXPLAIN ANALYZE is tricky with local execution, and there is not
|
||||||
|
* much difference between the local and distributed execution in terms
|
||||||
|
* of the actual EXPLAIN output.
|
||||||
|
*
|
||||||
|
* TODO: It might be nice to have a way to show that the query is locally
|
||||||
|
* executed. Shall we add a INFO output?
|
||||||
|
*/
|
||||||
|
DisableLocalExecution();
|
||||||
|
}
|
||||||
|
|
||||||
if (IsA(parsetree, CreateSubscriptionStmt))
|
if (IsA(parsetree, CreateSubscriptionStmt))
|
||||||
{
|
{
|
||||||
CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree;
|
CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree;
|
||||||
|
|
|
@ -1259,3 +1259,19 @@ ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AnyConnectionAccessedPlacements simply checks the number of entries in
|
||||||
|
* ConnectionPlacementHash. This is useful to detect whether we're in a
|
||||||
|
* distirbuted transaction and already executed at least one command that
|
||||||
|
* accessed to a placement.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
AnyConnectionAccessedPlacements(void)
|
||||||
|
{
|
||||||
|
/* this is initialized on PG_INIT */
|
||||||
|
Assert(ConnectionPlacementHash != NULL);
|
||||||
|
|
||||||
|
return hash_get_num_entries(ConnectionPlacementHash) > 0;
|
||||||
|
}
|
||||||
|
|
|
@ -131,6 +131,7 @@
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
@ -161,7 +162,14 @@ typedef struct DistributedExecution
|
||||||
/* the corresponding distributed plan's modLevel */
|
/* the corresponding distributed plan's modLevel */
|
||||||
RowModifyLevel modLevel;
|
RowModifyLevel modLevel;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* tasksToExecute contains all the tasks required to finish the execution, and
|
||||||
|
* it is the union of remoteTaskList and localTaskList. After (if any) local
|
||||||
|
* tasks are executed, remoteTaskList becomes equivalent of tasksToExecute.
|
||||||
|
*/
|
||||||
List *tasksToExecute;
|
List *tasksToExecute;
|
||||||
|
List *remoteTaskList;
|
||||||
|
List *localTaskList;
|
||||||
|
|
||||||
/* the corresponding distributed plan has RETURNING */
|
/* the corresponding distributed plan has RETURNING */
|
||||||
bool hasReturning;
|
bool hasReturning;
|
||||||
|
@ -519,6 +527,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
|
||||||
Tuplestorestate *tupleStore,
|
Tuplestorestate *tupleStore,
|
||||||
int targetPoolSize);
|
int targetPoolSize);
|
||||||
static void StartDistributedExecution(DistributedExecution *execution);
|
static void StartDistributedExecution(DistributedExecution *execution);
|
||||||
|
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
|
||||||
static void RunDistributedExecution(DistributedExecution *execution);
|
static void RunDistributedExecution(DistributedExecution *execution);
|
||||||
static bool ShouldRunTasksSequentially(List *taskList);
|
static bool ShouldRunTasksSequentially(List *taskList);
|
||||||
static void SequentialRunDistributedExecution(DistributedExecution *execution);
|
static void SequentialRunDistributedExecution(DistributedExecution *execution);
|
||||||
|
@ -528,8 +537,9 @@ static void CleanUpSessions(DistributedExecution *execution);
|
||||||
|
|
||||||
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
|
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
|
||||||
static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
|
static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
|
||||||
|
static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *
|
||||||
|
execution);
|
||||||
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
|
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
|
||||||
static bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
|
|
||||||
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
|
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
|
||||||
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
|
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
|
||||||
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
|
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
|
||||||
|
@ -588,12 +598,12 @@ AdaptiveExecutor(CustomScanState *node)
|
||||||
DistributedExecution *execution = NULL;
|
DistributedExecution *execution = NULL;
|
||||||
EState *executorState = ScanStateGetExecutorState(scanState);
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
Tuplestorestate *tupleStore = NULL;
|
|
||||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
bool randomAccess = true;
|
bool randomAccess = true;
|
||||||
bool interTransactions = false;
|
bool interTransactions = false;
|
||||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
|
|
||||||
|
|
||||||
Job *job = distributedPlan->workerJob;
|
Job *job = distributedPlan->workerJob;
|
||||||
List *taskList = job->taskList;
|
List *taskList = job->taskList;
|
||||||
|
|
||||||
|
@ -609,22 +619,35 @@ AdaptiveExecutor(CustomScanState *node)
|
||||||
|
|
||||||
ExecuteSubPlans(distributedPlan);
|
ExecuteSubPlans(distributedPlan);
|
||||||
|
|
||||||
scanState->tuplestorestate =
|
|
||||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
|
||||||
tupleStore = scanState->tuplestorestate;
|
|
||||||
|
|
||||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
{
|
{
|
||||||
|
/* defer decision after ExecuteSubPlans() */
|
||||||
targetPoolSize = 1;
|
targetPoolSize = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
execution = CreateDistributedExecution(distributedPlan->modLevel, taskList,
|
scanState->tuplestorestate =
|
||||||
distributedPlan->hasReturning,
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||||
paramListInfo, tupleDescriptor,
|
|
||||||
tupleStore, targetPoolSize);
|
|
||||||
|
|
||||||
|
execution = CreateDistributedExecution(distributedPlan->modLevel, taskList,
|
||||||
|
distributedPlan->hasReturning, paramListInfo,
|
||||||
|
tupleDescriptor,
|
||||||
|
scanState->tuplestorestate, targetPoolSize);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure that we acquire the appropriate locks even if the local tasks
|
||||||
|
* are going to be executed with local execution.
|
||||||
|
*/
|
||||||
StartDistributedExecution(execution);
|
StartDistributedExecution(execution);
|
||||||
|
|
||||||
|
/* execute tasks local to the node (if any) */
|
||||||
|
if (list_length(execution->localTaskList) > 0)
|
||||||
|
{
|
||||||
|
RunLocalExecution(scanState, execution);
|
||||||
|
|
||||||
|
/* make sure that we only execute remoteTaskList afterwards */
|
||||||
|
AdjustDistributedExecutionAfterLocalExecution(execution);
|
||||||
|
}
|
||||||
|
|
||||||
if (ShouldRunTasksSequentially(execution->tasksToExecute))
|
if (ShouldRunTasksSequentially(execution->tasksToExecute))
|
||||||
{
|
{
|
||||||
SequentialRunDistributedExecution(execution);
|
SequentialRunDistributedExecution(execution);
|
||||||
|
@ -636,7 +659,22 @@ AdaptiveExecutor(CustomScanState *node)
|
||||||
|
|
||||||
if (distributedPlan->modLevel != ROW_MODIFY_READONLY)
|
if (distributedPlan->modLevel != ROW_MODIFY_READONLY)
|
||||||
{
|
{
|
||||||
executorState->es_processed = execution->rowsProcessed;
|
if (list_length(execution->localTaskList) == 0)
|
||||||
|
{
|
||||||
|
Assert(executorState->es_processed == 0);
|
||||||
|
|
||||||
|
executorState->es_processed = execution->rowsProcessed;
|
||||||
|
}
|
||||||
|
else if (distributedPlan->targetRelationId != InvalidOid &&
|
||||||
|
PartitionMethod(distributedPlan->targetRelationId) != DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For reference tables we already add rowsProcessed on the local execution,
|
||||||
|
* this is required to ensure that mixed local/remote executions reports
|
||||||
|
* the accurate number of rowsProcessed to the user.
|
||||||
|
*/
|
||||||
|
executorState->es_processed += execution->rowsProcessed;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
FinishDistributedExecution(execution);
|
FinishDistributedExecution(execution);
|
||||||
|
@ -650,6 +688,52 @@ AdaptiveExecutor(CustomScanState *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RunLocalExecution runs the localTaskList in the execution, fills the tuplestore
|
||||||
|
* and sets the es_processed if necessary.
|
||||||
|
*
|
||||||
|
* It also sorts the tuplestore if there are no remote tasks remaining.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
|
||||||
|
{
|
||||||
|
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList);
|
||||||
|
EState *executorState = NULL;
|
||||||
|
|
||||||
|
LocalExecutionHappened = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We're deliberately not setting execution->rowsProceessed here. The main reason
|
||||||
|
* is that modifications to reference tables would end-up setting it both here
|
||||||
|
* and in AdaptiveExecutor. Instead, we set executorState here and skip updating it
|
||||||
|
* for reference table modifications in AdaptiveExecutor.
|
||||||
|
*/
|
||||||
|
executorState = ScanStateGetExecutorState(scanState);
|
||||||
|
executorState->es_processed = rowsProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AdjustDistributedExecutionAfterLocalExecution simply updates the necessary fields of
|
||||||
|
* the distributed execution.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Local execution already stores the tuples for returning, so we should not
|
||||||
|
* store again.
|
||||||
|
*/
|
||||||
|
execution->hasReturning = false;
|
||||||
|
|
||||||
|
/* we only need to execute the remote tasks */
|
||||||
|
execution->tasksToExecute = execution->remoteTaskList;
|
||||||
|
|
||||||
|
execution->totalTaskCount = list_length(execution->remoteTaskList);
|
||||||
|
execution->unfinishedTaskCount = list_length(execution->remoteTaskList);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
|
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
|
||||||
* list for utility commands. If the adaptive executor is enabled, the function
|
* list for utility commands. If the adaptive executor is enabled, the function
|
||||||
|
@ -705,6 +789,12 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
DistributedExecution *execution = NULL;
|
DistributedExecution *execution = NULL;
|
||||||
ParamListInfo paramListInfo = NULL;
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The code-paths that rely on this function do not know how execute
|
||||||
|
* commands locally.
|
||||||
|
*/
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
|
||||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
{
|
{
|
||||||
targetPoolSize = 1;
|
targetPoolSize = 1;
|
||||||
|
@ -726,7 +816,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
* CreateDistributedExecution creates a distributed execution data structure for
|
* CreateDistributedExecution creates a distributed execution data structure for
|
||||||
* a distributed plan.
|
* a distributed plan.
|
||||||
*/
|
*/
|
||||||
DistributedExecution *
|
static DistributedExecution *
|
||||||
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
|
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
|
||||||
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
|
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
|
||||||
Tuplestorestate *tupleStore, int targetPoolSize)
|
Tuplestorestate *tupleStore, int targetPoolSize)
|
||||||
|
@ -738,6 +828,9 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
|
||||||
execution->tasksToExecute = taskList;
|
execution->tasksToExecute = taskList;
|
||||||
execution->hasReturning = hasReturning;
|
execution->hasReturning = hasReturning;
|
||||||
|
|
||||||
|
execution->localTaskList = NIL;
|
||||||
|
execution->remoteTaskList = NIL;
|
||||||
|
|
||||||
execution->executionStats =
|
execution->executionStats =
|
||||||
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
|
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
|
||||||
execution->paramListInfo = paramListInfo;
|
execution->paramListInfo = paramListInfo;
|
||||||
|
@ -757,6 +850,14 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
|
||||||
execution->connectionSetChanged = false;
|
execution->connectionSetChanged = false;
|
||||||
execution->waitFlagsChanged = false;
|
execution->waitFlagsChanged = false;
|
||||||
|
|
||||||
|
if (ShouldExecuteTasksLocally(taskList))
|
||||||
|
{
|
||||||
|
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
|
||||||
|
|
||||||
|
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList,
|
||||||
|
&execution->remoteTaskList);
|
||||||
|
}
|
||||||
|
|
||||||
return execution;
|
return execution;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -774,11 +875,20 @@ StartDistributedExecution(DistributedExecution *execution)
|
||||||
|
|
||||||
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_BARE)
|
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_BARE)
|
||||||
{
|
{
|
||||||
if (DistributedExecutionRequiresRollback(execution))
|
/*
|
||||||
|
* In case localExecutionHappened, we simply force the executor to use 2PC.
|
||||||
|
* The primary motivation is that at this point we're definitely expanding
|
||||||
|
* the nodes participated in the transaction. And, by re-generating the
|
||||||
|
* remote task lists during local query execution, we might prevent the adaptive
|
||||||
|
* executor to kick-in 2PC (or even start coordinated transaction, that's why
|
||||||
|
* we prefer adding this check here instead of
|
||||||
|
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
|
||||||
|
*/
|
||||||
|
if (DistributedExecutionRequiresRollback(execution) || LocalExecutionHappened)
|
||||||
{
|
{
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
if (TaskListRequires2PC(taskList))
|
if (TaskListRequires2PC(taskList) || LocalExecutionHappened)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Although using two phase commit protocol is an independent decision than
|
* Although using two phase commit protocol is an independent decision than
|
||||||
|
@ -848,7 +958,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution)
|
||||||
* DistributedPlanModifiesDatabase returns true if the plan modifies the data
|
* DistributedPlanModifiesDatabase returns true if the plan modifies the data
|
||||||
* or the schema.
|
* or the schema.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
DistributedPlanModifiesDatabase(DistributedPlan *plan)
|
DistributedPlanModifiesDatabase(DistributedPlan *plan)
|
||||||
{
|
{
|
||||||
return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList);
|
return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList);
|
||||||
|
@ -1252,7 +1362,6 @@ AssignTasksToConnections(DistributedExecution *execution)
|
||||||
shardCommandExecution->expectResults = hasReturning ||
|
shardCommandExecution->expectResults = hasReturning ||
|
||||||
modLevel == ROW_MODIFY_READONLY;
|
modLevel == ROW_MODIFY_READONLY;
|
||||||
|
|
||||||
|
|
||||||
foreach(taskPlacementCell, task->taskPlacementList)
|
foreach(taskPlacementCell, task->taskPlacementList)
|
||||||
{
|
{
|
||||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/insert_select_executor.h"
|
#include "distributed/insert_select_executor.h"
|
||||||
#include "distributed/insert_select_planner.h"
|
#include "distributed/insert_select_planner.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
@ -74,6 +75,16 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
|
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* INSERT .. SELECT via coordinator consists of two steps, a SELECT is
|
||||||
|
* followd by a COPY. If the SELECT is executed locally, then the COPY
|
||||||
|
* would fail since Citus currently doesn't know how to handle COPY
|
||||||
|
* locally. So, to prevent the command fail, we simply disable local
|
||||||
|
* execution.
|
||||||
|
*/
|
||||||
|
DisableLocalExecution();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we are dealing with partitioned table, we also need to lock its
|
* If we are dealing with partitioned table, we also need to lock its
|
||||||
* partitions. Here we only lock targetRelation, we acquire necessary
|
* partitions. Here we only lock targetRelation, we acquire necessary
|
||||||
|
@ -84,7 +95,6 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
|
||||||
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (distributedPlan->workerJob != NULL)
|
if (distributedPlan->workerJob != NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -0,0 +1,490 @@
|
||||||
|
/*
|
||||||
|
* local_executor.c
|
||||||
|
*
|
||||||
|
* The scope of the local execution is locally executing the queries on the
|
||||||
|
* shards. In other words, local execution does not deal with any local tables
|
||||||
|
* that are not shards on the node that the query is being executed. In that sense,
|
||||||
|
* the local executor is only triggered if the node has both the metadata and the
|
||||||
|
* shards (e.g., only Citus MX worker nodes).
|
||||||
|
*
|
||||||
|
* The goal of the local execution is to skip the unnecessary network round-trip
|
||||||
|
* happening on the node itself. Instead, identify the locally executable tasks and
|
||||||
|
* simply call PostgreSQL's planner and executor.
|
||||||
|
*
|
||||||
|
* The local executor is an extension of the adaptive executor. So, the executor uses
|
||||||
|
* adaptive executor's custom scan nodes.
|
||||||
|
*
|
||||||
|
* One thing to note that Citus MX is only supported with replication factor = 1, so
|
||||||
|
* keep that in mind while continuing the comments below.
|
||||||
|
*
|
||||||
|
* On the high level, there are 3 slightly different ways of utilizing local execution:
|
||||||
|
*
|
||||||
|
* (1) Execution of local single shard queries of a distributed table
|
||||||
|
*
|
||||||
|
* This is the simplest case. The executor kicks at the start of the adaptive
|
||||||
|
* executor, and since the query is only a single task the execution finishes
|
||||||
|
* without going to the network at all.
|
||||||
|
*
|
||||||
|
* Even if there is a transaction block (or recursively planned CTEs), as long
|
||||||
|
* as the queries hit the shards on the same, the local execution will kick in.
|
||||||
|
*
|
||||||
|
* (2) Execution of local single queries and remote multi-shard queries
|
||||||
|
*
|
||||||
|
* The rule is simple. If a transaction block starts with a local query execution,
|
||||||
|
* all the other queries in the same transaction block that touch any local shard
|
||||||
|
* have to use the local execution. Although this sounds restrictive, we prefer to
|
||||||
|
* implement in this way, otherwise we'd end-up with as complex scenarious as we
|
||||||
|
* have in the connection managements due to foreign keys.
|
||||||
|
*
|
||||||
|
* See the following example:
|
||||||
|
* BEGIN;
|
||||||
|
* -- assume that the query is executed locally
|
||||||
|
* SELECT count(*) FROM test WHERE key = 1;
|
||||||
|
*
|
||||||
|
* -- at this point, all the shards that reside on the
|
||||||
|
* -- node is executed locally one-by-one. After those finishes
|
||||||
|
* -- the remaining tasks are handled by adaptive executor
|
||||||
|
* SELECT count(*) FROM test;
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* (3) Modifications of reference tables
|
||||||
|
*
|
||||||
|
* Modifications to reference tables have to be executed on all nodes. So, after the
|
||||||
|
* local execution, the adaptive executor keeps continuing the execution on the other
|
||||||
|
* nodes.
|
||||||
|
*
|
||||||
|
* Note that for read-only queries, after the local execution, there is no need to
|
||||||
|
* kick in adaptive executor.
|
||||||
|
*
|
||||||
|
* There are also few limitations/trade-offs that is worth mentioning. First, the
|
||||||
|
* local execution on multiple shards might be slow because the execution has to
|
||||||
|
* happen one task at a time (e.g., no parallelism). Second, if a transaction
|
||||||
|
* block/CTE starts with a multi-shard command, we do not use local query execution
|
||||||
|
* since local execution is sequential. Basically, we do not want to lose parallelism
|
||||||
|
* across local tasks by switching to local execution. Third, the local execution
|
||||||
|
* currently only supports queries. In other words, any utility commands like TRUNCATE,
|
||||||
|
* fails if the command is executed after a local execution inside a transaction block.
|
||||||
|
* Forth, the local execution cannot be mixed with the executors other than adaptive,
|
||||||
|
* namely task-tracker, real-time and router executors. Finally, related with the
|
||||||
|
* previous item, COPY command cannot be mixed with local execution in a transaction.
|
||||||
|
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
|
||||||
|
* via the local execution.
|
||||||
|
*/
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "distributed/citus_custom_scan.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/relation_access_tracking.h"
|
||||||
|
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "executor/tstoreReceiver.h"
|
||||||
|
#include "executor/tuptable.h"
|
||||||
|
#include "optimizer/planner.h"
|
||||||
|
#include "nodes/params.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* controlled via a GUC */
|
||||||
|
bool EnableLocalExecution = true;
|
||||||
|
bool LogLocalCommands = false;
|
||||||
|
|
||||||
|
bool LocalExecutionHappened = false;
|
||||||
|
|
||||||
|
|
||||||
|
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
|
||||||
|
List **localTaskPlacementList,
|
||||||
|
List **remoteTaskPlacementList);
|
||||||
|
static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan,
|
||||||
|
char *queryString);
|
||||||
|
static bool TaskAccessesLocalNode(Task *task);
|
||||||
|
static void LogLocalCommand(const char *command);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteLocalTasks gets a CitusScanState node and list of local tasks.
|
||||||
|
*
|
||||||
|
* The function goes over the task list and executes them locally.
|
||||||
|
* The returning tuples (if any) is stored in the CitusScanState.
|
||||||
|
*
|
||||||
|
* The function returns totalRowsProcessed.
|
||||||
|
*/
|
||||||
|
uint64
|
||||||
|
ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
||||||
|
{
|
||||||
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
|
ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info);
|
||||||
|
int numParams = 0;
|
||||||
|
Oid *parameterTypes = NULL;
|
||||||
|
ListCell *taskCell = NULL;
|
||||||
|
uint64 totalRowsProcessed = 0;
|
||||||
|
|
||||||
|
if (paramListInfo != NULL)
|
||||||
|
{
|
||||||
|
const char **parameterValues = NULL; /* not used anywhere, so decleare here */
|
||||||
|
|
||||||
|
ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes,
|
||||||
|
¶meterValues);
|
||||||
|
|
||||||
|
numParams = paramListInfo->numParams;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(taskCell, taskList)
|
||||||
|
{
|
||||||
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
|
|
||||||
|
PlannedStmt *localPlan = NULL;
|
||||||
|
int cursorOptions = 0;
|
||||||
|
const char *shardQueryString = task->queryString;
|
||||||
|
Query *shardQuery = ParseQueryString(shardQueryString, parameterTypes, numParams);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of
|
||||||
|
* intermediate results in the query. That'd trigger ExecuteLocalTaskPlan()
|
||||||
|
* go through the distributed executor, which we do not want since the
|
||||||
|
* query is already known to be local.
|
||||||
|
*/
|
||||||
|
cursorOptions = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Altough the shardQuery is local to this node, we prefer planner()
|
||||||
|
* over standard_planner(). The primary reason for that is Citus itself
|
||||||
|
* is not very tolarent standard_planner() calls that doesn't go through
|
||||||
|
* distributed_planner() because of the way that restriction hooks are
|
||||||
|
* implemented. So, let planner to call distributed_planner() which
|
||||||
|
* eventually calls standard_planner().
|
||||||
|
*/
|
||||||
|
localPlan = planner(shardQuery, cursorOptions, paramListInfo);
|
||||||
|
|
||||||
|
LogLocalCommand(shardQueryString);
|
||||||
|
|
||||||
|
totalRowsProcessed +=
|
||||||
|
ExecuteLocalTaskPlan(scanState, localPlan, task->queryString);
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalRowsProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExtractLocalAndRemoteTasks gets a taskList and generates two
|
||||||
|
* task lists namely localTaskList and remoteTaskList. The function goes
|
||||||
|
* over the input taskList and puts the tasks that are local to the node
|
||||||
|
* into localTaskList and the remaining to the remoteTaskList. Either of
|
||||||
|
* the lists could be NIL depending on the input taskList.
|
||||||
|
*
|
||||||
|
* One slightly different case is modifications to replicated tables
|
||||||
|
* (e.g., reference tables) where a single task ends in two seperate tasks
|
||||||
|
* and the local task is added to localTaskList and the remanings to the
|
||||||
|
* remoteTaskList.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
|
List **remoteTaskList)
|
||||||
|
{
|
||||||
|
ListCell *taskCell = NULL;
|
||||||
|
|
||||||
|
*remoteTaskList = NIL;
|
||||||
|
*localTaskList = NIL;
|
||||||
|
|
||||||
|
foreach(taskCell, taskList)
|
||||||
|
{
|
||||||
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
|
|
||||||
|
List *localTaskPlacementList = NULL;
|
||||||
|
List *remoteTaskPlacementList = NULL;
|
||||||
|
|
||||||
|
SplitLocalAndRemotePlacements(task->taskPlacementList, &localTaskPlacementList,
|
||||||
|
&remoteTaskPlacementList);
|
||||||
|
|
||||||
|
/* either the local or the remote should be non-nil */
|
||||||
|
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
|
||||||
|
|
||||||
|
if (list_length(task->taskPlacementList) == 1)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* At this point, the task has a single placement (e.g,. anchor shard is
|
||||||
|
* distributed table's shard). So, it is either added to local or remote
|
||||||
|
* taskList.
|
||||||
|
*/
|
||||||
|
if (localTaskPlacementList == NIL)
|
||||||
|
{
|
||||||
|
*remoteTaskList = lappend(*remoteTaskList, task);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
*localTaskList = lappend(*localTaskList, task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Task *localTask = copyObject(task);
|
||||||
|
Task *remoteTask = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* At this point, we're dealing with reference tables or intermediate results
|
||||||
|
* where the task has placements on both local and remote nodes. We always
|
||||||
|
* prefer to use local placement, and require remote placements only for
|
||||||
|
* modifications.
|
||||||
|
*/
|
||||||
|
|
||||||
|
localTask->taskPlacementList = localTaskPlacementList;
|
||||||
|
*localTaskList = lappend(*localTaskList, localTask);
|
||||||
|
|
||||||
|
if (readOnly)
|
||||||
|
{
|
||||||
|
/* read-only tasks should only be executed on the local machine */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
remoteTask = copyObject(task);
|
||||||
|
remoteTask->taskPlacementList = remoteTaskPlacementList;
|
||||||
|
|
||||||
|
*remoteTaskList = lappend(*remoteTaskList, remoteTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SplitLocalAndRemotePlacements is a helper function which iterates over the input
|
||||||
|
* taskPlacementList and puts the placements into corresponding list of either
|
||||||
|
* localTaskPlacementList or remoteTaskPlacementList.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList,
|
||||||
|
List **remoteTaskPlacementList)
|
||||||
|
{
|
||||||
|
ListCell *placementCell = NULL;
|
||||||
|
int32 localGroupId = GetLocalGroupId();
|
||||||
|
|
||||||
|
*localTaskPlacementList = NIL;
|
||||||
|
*remoteTaskPlacementList = NIL;
|
||||||
|
|
||||||
|
foreach(placementCell, taskPlacementList)
|
||||||
|
{
|
||||||
|
ShardPlacement *taskPlacement =
|
||||||
|
(ShardPlacement *) lfirst(placementCell);
|
||||||
|
|
||||||
|
if (taskPlacement->groupId == localGroupId)
|
||||||
|
{
|
||||||
|
*localTaskPlacementList = lappend(*localTaskPlacementList, taskPlacement);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
*remoteTaskPlacementList = lappend(*remoteTaskPlacementList, taskPlacement);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteLocalTaskPlan gets a planned statement which can be executed locally.
|
||||||
|
* The function simply follows the steps to have a local execution, sets the
|
||||||
|
* tupleStore if necessary. The function returns the
|
||||||
|
*/
|
||||||
|
static uint64
|
||||||
|
ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString)
|
||||||
|
{
|
||||||
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
|
DestReceiver *tupleStoreDestReceiever = CreateDestReceiver(DestTuplestore);
|
||||||
|
ScanDirection scanDirection = ForwardScanDirection;
|
||||||
|
QueryEnvironment *queryEnv = create_queryEnv();
|
||||||
|
QueryDesc *queryDesc = NULL;
|
||||||
|
int eflags = 0;
|
||||||
|
uint64 totalRowsProcessed = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use the tupleStore provided by the scanState because it is shared accross
|
||||||
|
* the other task executions and the adaptive executor.
|
||||||
|
*/
|
||||||
|
SetTuplestoreDestReceiverParams(tupleStoreDestReceiever,
|
||||||
|
scanState->tuplestorestate,
|
||||||
|
CurrentMemoryContext, false);
|
||||||
|
|
||||||
|
/* Create a QueryDesc for the query */
|
||||||
|
queryDesc = CreateQueryDesc(taskPlan, queryString,
|
||||||
|
GetActiveSnapshot(), InvalidSnapshot,
|
||||||
|
tupleStoreDestReceiever, paramListInfo,
|
||||||
|
queryEnv, 0);
|
||||||
|
|
||||||
|
ExecutorStart(queryDesc, eflags);
|
||||||
|
ExecutorRun(queryDesc, scanDirection, 0L, true);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We'll set the executorState->es_processed later, for now only remember
|
||||||
|
* the count.
|
||||||
|
*/
|
||||||
|
if (taskPlan->commandType != CMD_SELECT)
|
||||||
|
{
|
||||||
|
totalRowsProcessed = queryDesc->estate->es_processed;
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorFinish(queryDesc);
|
||||||
|
ExecutorEnd(queryDesc);
|
||||||
|
|
||||||
|
FreeQueryDesc(queryDesc);
|
||||||
|
|
||||||
|
return totalRowsProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldExecuteTasksLocally gets a task list and returns true if the
|
||||||
|
* any of the tasks should be executed locally. This function does not
|
||||||
|
* guarantee that any task have to be executed locally.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ShouldExecuteTasksLocally(List *taskList)
|
||||||
|
{
|
||||||
|
bool singleTask = false;
|
||||||
|
|
||||||
|
if (!EnableLocalExecution)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LocalExecutionHappened)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For various reasons, including the transaction visibility
|
||||||
|
* rules (e.g., read-your-own-writes), we have to use local
|
||||||
|
* execution again if it has already happened within this
|
||||||
|
* transaction block.
|
||||||
|
*
|
||||||
|
* We might error out later in the execution if it is not suitable
|
||||||
|
* to execute the tasks locally.
|
||||||
|
*/
|
||||||
|
Assert(IsMultiStatementTransaction() || InCoordinatedTransaction());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TODO: A future improvement could be to keep track of which placements
|
||||||
|
* have been locally executed. At this point, only use local execution for
|
||||||
|
* those placements. That'd help to benefit more from parallelism.
|
||||||
|
*/
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
singleTask = (list_length(taskList) == 1);
|
||||||
|
if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList)))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This is the valuable time to use the local execution. We are likely
|
||||||
|
* to avoid any network round-trips by simply executing the command within
|
||||||
|
* this session.
|
||||||
|
*
|
||||||
|
* We cannot avoid network round trips if the task is not a read only
|
||||||
|
* task and accesses multiple placements. For example, modifications to
|
||||||
|
* distributed tables (with replication factor == 1) would avoid network
|
||||||
|
* round-trips. However, modifications to reference tables still needs to
|
||||||
|
* go to over the network to do the modification on the other placements.
|
||||||
|
* Still, we'll be avoding the network round trip for this node.
|
||||||
|
*
|
||||||
|
* Note that we shouldn't use local execution if any distributed execution
|
||||||
|
* has happened because that'd break transaction visibility rules and
|
||||||
|
* many other things.
|
||||||
|
*/
|
||||||
|
return !AnyConnectionAccessedPlacements();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!singleTask)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For multi-task executions, switching to local execution would likely to
|
||||||
|
* perform poorly, because we'd lose the parallelizm. Note that the local
|
||||||
|
* execution is happening one task at a time (e.g., similar to sequential
|
||||||
|
* distributed execution).
|
||||||
|
*/
|
||||||
|
Assert(!LocalExecutionHappened);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TaskAccessesLocalNode returns true if any placements of the task reside on the
|
||||||
|
* node that we're executing the query.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
TaskAccessesLocalNode(Task *task)
|
||||||
|
{
|
||||||
|
ListCell *placementCell = NULL;
|
||||||
|
int localGroupId = GetLocalGroupId();
|
||||||
|
|
||||||
|
foreach(placementCell, task->taskPlacementList)
|
||||||
|
{
|
||||||
|
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(placementCell);
|
||||||
|
|
||||||
|
if (taskPlacement->groupId == localGroupId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfLocalExecutionHappened() errors out if a local query has already been executed
|
||||||
|
* in the same transaction.
|
||||||
|
*
|
||||||
|
* This check is required because Citus currently hasn't implemented local execution
|
||||||
|
* infrastructure for all the commands/executors. As we implement local execution for
|
||||||
|
* the command/executor that this function call exists, we should simply remove the check.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ErrorIfLocalExecutionHappened(void)
|
||||||
|
{
|
||||||
|
if (LocalExecutionHappened)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot execute command because a local execution has "
|
||||||
|
"already been done in the transaction"),
|
||||||
|
errhint("Try re-running the transaction with "
|
||||||
|
"\"SET LOCAL citus.enable_local_execution TO OFF;\""),
|
||||||
|
errdetail("Some parallel commands cannot be executed if a "
|
||||||
|
"previous command has already been executed locally")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LogLocalCommand logs commands executed locally on this node. Although we're
|
||||||
|
* talking about local execution, the function relies on citus.log_remote_commands GUC.
|
||||||
|
* This makes sense because the local execution is still on a shard of a distributed table,
|
||||||
|
* meaning it is part of distributed execution.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LogLocalCommand(const char *command)
|
||||||
|
{
|
||||||
|
if (!(LogRemoteCommands || LogLocalCommands))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(LOG, (errmsg("executing the command locally: %s",
|
||||||
|
ApplyLogRedaction(command))));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DisableLocalExecution simply a C interface for
|
||||||
|
* setting the following:
|
||||||
|
* SET LOCAL citus.enable_local_execution TO off;
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
DisableLocalExecution(void)
|
||||||
|
{
|
||||||
|
set_config_option("citus.enable_local_execution", "off",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
}
|
|
@ -397,7 +397,7 @@ void
|
||||||
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
|
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
|
||||||
DestReceiver *dest)
|
DestReceiver *dest)
|
||||||
{
|
{
|
||||||
Query *query = ParseQueryString(queryString);
|
Query *query = ParseQueryString(queryString, NULL, 0);
|
||||||
|
|
||||||
ExecuteQueryIntoDestReceiver(query, params, dest);
|
ExecuteQueryIntoDestReceiver(query, params, dest);
|
||||||
}
|
}
|
||||||
|
@ -407,11 +407,12 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params
|
||||||
* ParseQuery parses query string and returns a Query struct.
|
* ParseQuery parses query string and returns a Query struct.
|
||||||
*/
|
*/
|
||||||
Query *
|
Query *
|
||||||
ParseQueryString(const char *queryString)
|
ParseQueryString(const char *queryString, Oid *paramOids, int numParams)
|
||||||
{
|
{
|
||||||
Query *query = NULL;
|
Query *query = NULL;
|
||||||
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
||||||
List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL);
|
List *queryTreeList =
|
||||||
|
pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL);
|
||||||
|
|
||||||
if (list_length(queryTreeList) != 1)
|
if (list_length(queryTreeList) != 1)
|
||||||
{
|
{
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
@ -1035,6 +1036,9 @@ RealTimeExecScan(CustomScanState *node)
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
Job *workerJob = distributedPlan->workerJob;
|
||||||
|
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
DisableLocalExecution();
|
||||||
|
|
||||||
/* we are taking locks on partitions of partitioned tables */
|
/* we are taking locks on partitions of partitioned tables */
|
||||||
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
|
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -588,6 +589,9 @@ RouterModifyExecScan(CustomScanState *node)
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
bool parallelExecution = true;
|
bool parallelExecution = true;
|
||||||
|
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
DisableLocalExecution();
|
||||||
|
|
||||||
ExecuteSubPlans(distributedPlan);
|
ExecuteSubPlans(distributedPlan);
|
||||||
|
|
||||||
if (list_length(taskList) <= 1 ||
|
if (list_length(taskList) <= 1 ||
|
||||||
|
@ -866,6 +870,9 @@ RouterSelectExecScan(CustomScanState *node)
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
Job *workerJob = distributedPlan->workerJob;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
|
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
DisableLocalExecution();
|
||||||
|
|
||||||
/* we are taking locks on partitions of partitioned tables */
|
/* we are taking locks on partitions of partitioned tables */
|
||||||
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
|
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
@ -3008,6 +3009,9 @@ TaskTrackerExecScan(CustomScanState *node)
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
Job *workerJob = distributedPlan->workerJob;
|
||||||
Query *jobQuery = workerJob->jobQuery;
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
|
|
||||||
|
ErrorIfLocalExecutionHappened();
|
||||||
|
DisableLocalExecution();
|
||||||
|
|
||||||
if (ContainsReadIntermediateResultFunction((Node *) jobQuery))
|
if (ContainsReadIntermediateResultFunction((Node *) jobQuery))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("Complex subqueries and CTEs are not supported when "
|
ereport(ERROR, (errmsg("Complex subqueries and CTEs are not supported when "
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/distributed_deadlock_detection.h"
|
#include "distributed/distributed_deadlock_detection.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
@ -430,6 +431,17 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_local_execution",
|
||||||
|
gettext_noop("Enables queries on shards that are local to the current node "
|
||||||
|
"to be planned and executed locally."),
|
||||||
|
NULL,
|
||||||
|
&EnableLocalExecution,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
0,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_single_hash_repartition_joins",
|
"citus.enable_single_hash_repartition_joins",
|
||||||
gettext_noop("Enables single hash repartitioning between hash "
|
gettext_noop("Enables single hash repartitioning between hash "
|
||||||
|
@ -509,6 +521,17 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.log_local_commands",
|
||||||
|
gettext_noop("Log queries that are executed locally, can be overriden by "
|
||||||
|
"citus.log_remote_commands"),
|
||||||
|
NULL,
|
||||||
|
&LogLocalCommands,
|
||||||
|
false,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.log_distributed_deadlock_detection",
|
"citus.log_distributed_deadlock_detection",
|
||||||
gettext_noop("Log distributed deadlock detection related processing in "
|
gettext_noop("Log distributed deadlock detection related processing in "
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
#include "distributed/multi_shard_transaction.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
|
@ -225,6 +227,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
|
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
|
LocalExecutionHappened = false;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
activeSetStmts = NULL;
|
activeSetStmts = NULL;
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
|
@ -278,6 +281,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
|
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
|
LocalExecutionHappened = false;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
activeSetStmts = NULL;
|
activeSetStmts = NULL;
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
|
|
|
@ -85,7 +85,7 @@ worker_execute_sql_task(PG_FUNCTION_ARGS)
|
||||||
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
||||||
StringInfo taskFilename = UserTaskFilename(jobDirectoryName, taskId);
|
StringInfo taskFilename = UserTaskFilename(jobDirectoryName, taskId);
|
||||||
|
|
||||||
query = ParseQueryString(queryString);
|
query = ParseQueryString(queryString, NULL, 0);
|
||||||
tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat);
|
tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat);
|
||||||
|
|
||||||
PG_RETURN_INT64(tuplesSent);
|
PG_RETURN_INT64(tuplesSent);
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* local_executor.h
|
||||||
|
* Functions and global variables to control local query execution.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2019, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef LOCAL_EXECUTION_H
|
||||||
|
#define LOCAL_EXECUTION_H
|
||||||
|
|
||||||
|
#include "distributed/citus_custom_scan.h"
|
||||||
|
|
||||||
|
/* enabled with GUCs*/
|
||||||
|
extern bool EnableLocalExecution;
|
||||||
|
extern bool LogLocalCommands;
|
||||||
|
|
||||||
|
extern bool LocalExecutionHappened;
|
||||||
|
|
||||||
|
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
|
||||||
|
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
||||||
|
List **localTaskList, List **remoteTaskList);
|
||||||
|
extern bool ShouldExecuteTasksLocally(List *taskList);
|
||||||
|
extern void ErrorIfLocalExecutionHappened(void);
|
||||||
|
extern void DisableLocalExecution(void);
|
||||||
|
extern bool AnyTaskAccessesRemoteNode(List *taskList);
|
||||||
|
|
||||||
|
#endif /* LOCAL_EXECUTION_H */
|
|
@ -50,7 +50,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
||||||
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||||
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
||||||
tupleDescriptor, Tuplestorestate *tupstore);
|
tupleDescriptor, Tuplestorestate *tupstore);
|
||||||
extern Query * ParseQueryString(const char *queryString);
|
extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams);
|
||||||
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
||||||
params,
|
params,
|
||||||
DestReceiver *dest);
|
DestReceiver *dest);
|
||||||
|
@ -61,6 +61,7 @@ extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo pa
|
||||||
extern void SetLocalMultiShardModifyModeToSequential(void);
|
extern void SetLocalMultiShardModifyModeToSequential(void);
|
||||||
extern void SetLocalForceMaxQueryParallelization(void);
|
extern void SetLocalForceMaxQueryParallelization(void);
|
||||||
extern void SortTupleStore(CitusScanState *scanState);
|
extern void SortTupleStore(CitusScanState *scanState);
|
||||||
|
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_EXECUTOR_H */
|
#endif /* MULTI_EXECUTOR_H */
|
||||||
|
|
|
@ -67,6 +67,7 @@ extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
|
||||||
extern void ResetShardPlacementAssociation(struct MultiConnection *connection);
|
extern void ResetShardPlacementAssociation(struct MultiConnection *connection);
|
||||||
|
|
||||||
extern void InitPlacementConnectionManagement(void);
|
extern void InitPlacementConnectionManagement(void);
|
||||||
|
extern bool AnyConnectionAccessedPlacements(void);
|
||||||
|
|
||||||
extern bool ConnectionModifiedPlacement(MultiConnection *connection);
|
extern bool ConnectionModifiedPlacement(MultiConnection *connection);
|
||||||
extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection);
|
extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -23,6 +23,7 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32744;
|
||||||
|
|
||||||
-- now singe-row INSERT to the other worker
|
-- now singe-row INSERT to the other worker
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
|
\set VERBOSITY terse
|
||||||
INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
||||||
20.69);
|
20.69);
|
||||||
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745;
|
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745;
|
||||||
|
@ -77,22 +78,15 @@ ERROR: cannot perform an INSERT with NULL in the partition column
|
||||||
INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
||||||
-5.00);
|
-5.00);
|
||||||
ERROR: new row for relation "limit_orders_mx_1220092" violates check constraint "limit_orders_mx_limit_price_check"
|
ERROR: new row for relation "limit_orders_mx_1220092" violates check constraint "limit_orders_mx_limit_price_check"
|
||||||
DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00).
|
|
||||||
CONTEXT: while executing command on localhost:57637
|
|
||||||
-- INSERT violating primary key constraint
|
-- INSERT violating primary key constraint
|
||||||
INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
|
INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
|
||||||
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
|
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
|
||||||
DETAIL: Key (id)=(32743) already exists.
|
|
||||||
CONTEXT: while executing command on localhost:57638
|
|
||||||
-- INSERT violating primary key constraint, with RETURNING specified.
|
-- INSERT violating primary key constraint, with RETURNING specified.
|
||||||
INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *;
|
INSERT INTO limit_orders_mx VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *;
|
||||||
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
|
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
|
||||||
DETAIL: Key (id)=(32743) already exists.
|
|
||||||
CONTEXT: while executing command on localhost:57638
|
|
||||||
-- INSERT, with RETURNING specified, failing with a non-constraint error
|
-- INSERT, with RETURNING specified, failing with a non-constraint error
|
||||||
INSERT INTO limit_orders_mx VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
|
INSERT INTO limit_orders_mx VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
|
||||||
ERROR: division by zero
|
ERROR: division by zero
|
||||||
CONTEXT: while executing command on localhost:57638
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
-- commands with non-constant partition values are unsupported
|
-- commands with non-constant partition values are unsupported
|
||||||
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
|
@ -221,8 +215,6 @@ UPDATE limit_orders_mx SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RET
|
||||||
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||||
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||||
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
|
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
|
||||||
DETAIL: Key (id)=(275) already exists.
|
|
||||||
CONTEXT: while executing command on localhost:57638
|
|
||||||
-- multi shard update is supported
|
-- multi shard update is supported
|
||||||
UPDATE limit_orders_mx SET limit_price = 0.00;
|
UPDATE limit_orders_mx SET limit_price = 0.00;
|
||||||
-- attempting to change the partition key is unsupported
|
-- attempting to change the partition key is unsupported
|
||||||
|
@ -294,8 +286,6 @@ CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
|
||||||
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
|
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
|
||||||
UPDATE limit_orders_mx SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
|
UPDATE limit_orders_mx SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
|
||||||
ERROR: null value in column "bidder_id" violates not-null constraint
|
ERROR: null value in column "bidder_id" violates not-null constraint
|
||||||
DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 0.00, {1,2}).
|
|
||||||
CONTEXT: while executing command on localhost:57637
|
|
||||||
SELECT array_of_values FROM limit_orders_mx WHERE id = 246;
|
SELECT array_of_values FROM limit_orders_mx WHERE id = 246;
|
||||||
array_of_values
|
array_of_values
|
||||||
-----------------
|
-----------------
|
||||||
|
|
|
@ -137,6 +137,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
|
||||||
|
|
||||||
-- and the other way around is also allowed
|
-- and the other way around is also allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -158,6 +159,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
|
||||||
|
|
||||||
-- and the other way around is also allowed
|
-- and the other way around is also allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -166,6 +168,7 @@ COMMIT;
|
||||||
-- this logic doesn't apply to router SELECTs occurring after a modification:
|
-- this logic doesn't apply to router SELECTs occurring after a modification:
|
||||||
-- selecting from the modified node is fine...
|
-- selecting from the modified node is fine...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
count
|
count
|
||||||
|
@ -176,6 +179,7 @@ SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
-- doesn't apply to COPY after modifications
|
-- doesn't apply to COPY after modifications
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
\copy labs_mx from stdin delimiter ','
|
\copy labs_mx from stdin delimiter ','
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -198,7 +202,6 @@ INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (1, 'orange');
|
INSERT INTO objects_mx VALUES (1, 'orange');
|
||||||
ERROR: duplicate key value violates unique constraint "objects_mx_pkey_1220103"
|
ERROR: duplicate key value violates unique constraint "objects_mx_pkey_1220103"
|
||||||
DETAIL: Key (id)=(1) already exists.
|
DETAIL: Key (id)=(1) already exists.
|
||||||
CONTEXT: while executing command on localhost:57637
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- data shouldn't have persisted...
|
-- data shouldn't have persisted...
|
||||||
SELECT * FROM objects_mx WHERE id = 1;
|
SELECT * FROM objects_mx WHERE id = 1;
|
||||||
|
@ -282,6 +285,7 @@ AFTER INSERT ON labs_mx_1220102
|
||||||
DEFERRABLE INITIALLY IMMEDIATE
|
DEFERRABLE INITIALLY IMMEDIATE
|
||||||
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
ERROR: illegal value
|
ERROR: illegal value
|
||||||
|
@ -332,6 +336,7 @@ DEFERRABLE INITIALLY DEFERRED
|
||||||
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
-- should be the same story as before, just at COMMIT time
|
-- should be the same story as before, just at COMMIT time
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
|
INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
|
||||||
|
@ -358,6 +363,7 @@ AFTER INSERT ON labs_mx_1220102
|
||||||
DEFERRABLE INITIALLY DEFERRED
|
DEFERRABLE INITIALLY DEFERRED
|
||||||
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
|
@ -383,6 +389,7 @@ SELECT * FROM labs_mx WHERE id = 8;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
DROP TRIGGER reject_bad_mx ON objects_mx_1220103;
|
DROP TRIGGER reject_bad_mx ON objects_mx_1220103;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
INSERT INTO labs_mx VALUES (9, 'BAD');
|
INSERT INTO labs_mx VALUES (9, 'BAD');
|
||||||
|
|
|
@ -101,6 +101,7 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
-- Multi-statement transactions should write 2 transaction recovery records
|
-- Multi-statement transactions should write 2 transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO false;
|
||||||
INSERT INTO test_recovery VALUES ('hello');
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
INSERT INTO test_recovery VALUES ('world');
|
INSERT INTO test_recovery VALUES ('world');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -116,6 +117,26 @@ SELECT recover_prepared_transactions();
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- the same transaction block, but this time
|
||||||
|
-- enable local execution as well. The first
|
||||||
|
-- command is locally executed, the second
|
||||||
|
-- is remote, so 1 entry is expected
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
|
INSERT INTO test_recovery VALUES ('world');
|
||||||
|
COMMIT;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
|
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
|
||||||
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
|
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
|
@ -31,7 +31,7 @@ test: recursive_dml_queries_mx multi_mx_truncate_from_worker
|
||||||
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
|
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
|
||||||
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
||||||
test: multi_mx_metadata
|
test: multi_mx_metadata
|
||||||
test: multi_mx_modifications
|
test: multi_mx_modifications local_shard_execution
|
||||||
test: multi_mx_transaction_recovery
|
test: multi_mx_transaction_recovery
|
||||||
test: multi_mx_modifying_xacts
|
test: multi_mx_modifying_xacts
|
||||||
test: multi_mx_explain
|
test: multi_mx_explain
|
||||||
|
|
|
@ -0,0 +1,670 @@
|
||||||
|
CREATE SCHEMA local_shard_execution;
|
||||||
|
SET search_path TO local_shard_execution;
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.next_shard_id TO 1470000;
|
||||||
|
|
||||||
|
CREATE TABLE reference_table (key int PRIMARY KEY);
|
||||||
|
SELECT create_reference_table('reference_table');
|
||||||
|
|
||||||
|
CREATE TABLE distributed_table (key int PRIMARY KEY , value text, age bigint CHECK (age > 10), FOREIGN KEY (key) REFERENCES reference_table(key) ON DELETE CASCADE);
|
||||||
|
SELECT create_distributed_table('distributed_table','key');
|
||||||
|
|
||||||
|
CREATE TABLE second_distributed_table (key int PRIMARY KEY , value text, FOREIGN KEY (key) REFERENCES distributed_table(key) ON DELETE CASCADE);
|
||||||
|
SELECT create_distributed_table('second_distributed_table','key');
|
||||||
|
|
||||||
|
-- ingest some data to enable some tests with data
|
||||||
|
INSERT INTO reference_table VALUES (1);
|
||||||
|
INSERT INTO distributed_table VALUES (1, '1', 20);
|
||||||
|
INSERT INTO second_distributed_table VALUES (1, '1');
|
||||||
|
|
||||||
|
-- a simple test for
|
||||||
|
CREATE TABLE collections_list (
|
||||||
|
key int,
|
||||||
|
ts timestamptz,
|
||||||
|
collection_id integer,
|
||||||
|
value numeric,
|
||||||
|
PRIMARY KEY(key, collection_id)
|
||||||
|
) PARTITION BY LIST (collection_id );
|
||||||
|
|
||||||
|
SELECT create_distributed_table('collections_list', 'key');
|
||||||
|
|
||||||
|
CREATE TABLE collections_list_0
|
||||||
|
PARTITION OF collections_list (key, ts, collection_id, value)
|
||||||
|
FOR VALUES IN ( 0 );
|
||||||
|
|
||||||
|
-- connection worker and get ready for the tests
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO local_shard_execution;
|
||||||
|
|
||||||
|
-- returns true of the distribution key filter
|
||||||
|
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
|
||||||
|
-- placement which is local to this not
|
||||||
|
CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
|
||||||
|
|
||||||
|
DECLARE shard_is_local BOOLEAN := FALSE;
|
||||||
|
|
||||||
|
BEGIN
|
||||||
|
|
||||||
|
WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_execution.distributed_table', dist_key)),
|
||||||
|
all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
|
||||||
|
SELECT
|
||||||
|
true INTO shard_is_local
|
||||||
|
FROM
|
||||||
|
local_shard_ids
|
||||||
|
WHERE
|
||||||
|
get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
|
||||||
|
|
||||||
|
IF shard_is_local IS NULL THEN
|
||||||
|
shard_is_local = FALSE;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN shard_is_local;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- pick some example values that reside on the shards locally and remote
|
||||||
|
|
||||||
|
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||||
|
-- we'll use these values in the tests
|
||||||
|
SELECT shard_of_distribution_column_is_local(1);
|
||||||
|
SELECT shard_of_distribution_column_is_local(6);
|
||||||
|
SELECT shard_of_distribution_column_is_local(500);
|
||||||
|
SELECT shard_of_distribution_column_is_local(701);
|
||||||
|
|
||||||
|
-- distribution key values of 11 and 12 are REMOTE to shards
|
||||||
|
SELECT shard_of_distribution_column_is_local(11);
|
||||||
|
SELECT shard_of_distribution_column_is_local(12);
|
||||||
|
|
||||||
|
--- enable logging to see which tasks are executed locally
|
||||||
|
SET client_min_messages TO LOG;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
-- first, make sure that local execution works fine
|
||||||
|
-- with simple queries that are not in transcation blocks
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
-- multiple tasks both of which are local should NOT use local execution
|
||||||
|
-- because local execution means executing the tasks locally, so the executor
|
||||||
|
-- favors parallel execution even if everyting is local to node
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key IN (1,6);
|
||||||
|
|
||||||
|
-- queries that hit any remote shards should NOT use local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key IN (1,11);
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
-- modifications also follow the same rules
|
||||||
|
INSERT INTO reference_table VALUES (1) ON CONFLICT DO NOTHING;
|
||||||
|
INSERT INTO distributed_table VALUES (1, '1', 21) ON CONFLICT DO NOTHING;
|
||||||
|
|
||||||
|
-- local query
|
||||||
|
DELETE FROM distributed_table WHERE key = 1 AND age = 21;
|
||||||
|
|
||||||
|
-- hitting multiple shards, so should be a distributed execution
|
||||||
|
DELETE FROM distributed_table WHERE age = 21;
|
||||||
|
|
||||||
|
-- although both shards are local, the executor choose the parallel execution
|
||||||
|
-- over the wire because as noted above local execution is sequential
|
||||||
|
DELETE FROM second_distributed_table WHERE key IN (1,6);
|
||||||
|
|
||||||
|
-- similarly, any multi-shard query just follows distributed execution
|
||||||
|
DELETE FROM second_distributed_table;
|
||||||
|
|
||||||
|
-- load some more data for the following tests
|
||||||
|
INSERT INTO second_distributed_table VALUES (1, '1');
|
||||||
|
|
||||||
|
-- INSERT .. SELECT hitting a single single (co-located) shard(s) should
|
||||||
|
-- be executed locally
|
||||||
|
INSERT INTO distributed_table
|
||||||
|
SELECT
|
||||||
|
distributed_table.*
|
||||||
|
FROM
|
||||||
|
distributed_table, second_distributed_table
|
||||||
|
WHERE
|
||||||
|
distributed_table.key = 1 and distributed_table.key=second_distributed_table.key
|
||||||
|
ON CONFLICT(key) DO UPDATE SET value = '22'
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
-- INSERT .. SELECT hitting multi-shards should go thourgh distributed execution
|
||||||
|
INSERT INTO distributed_table
|
||||||
|
SELECT
|
||||||
|
distributed_table.*
|
||||||
|
FROM
|
||||||
|
distributed_table, second_distributed_table
|
||||||
|
WHERE
|
||||||
|
distributed_table.key != 1 and distributed_table.key=second_distributed_table.key
|
||||||
|
ON CONFLICT(key) DO UPDATE SET value = '22'
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
-- INSERT..SELECT via coordinator consists of two steps, select + COPY
|
||||||
|
-- that's why it is disallowed to use local execution even if the SELECT
|
||||||
|
-- can be executed locally
|
||||||
|
INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING;
|
||||||
|
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
|
||||||
|
|
||||||
|
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
|
||||||
|
INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING;
|
||||||
|
|
||||||
|
|
||||||
|
-- EXPLAIN for local execution just works fine
|
||||||
|
-- though going through distributed execution
|
||||||
|
EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
|
-- TODO: Fix #2922
|
||||||
|
-- EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
|
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
|
-- TODO: Fix #2922
|
||||||
|
-- EXPLAIN ANALYZE DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
|
-- show that EXPLAIN ANALYZE deleted the row
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 AND age = 20 ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
-- copy always happens via distributed execution irrespective of the
|
||||||
|
-- shards that are accessed
|
||||||
|
COPY reference_table FROM STDIN;
|
||||||
|
6
|
||||||
|
11
|
||||||
|
\.
|
||||||
|
|
||||||
|
COPY distributed_table FROM STDIN WITH CSV;
|
||||||
|
6,'6',25
|
||||||
|
11,'11',121
|
||||||
|
\.
|
||||||
|
|
||||||
|
COPY second_distributed_table FROM STDIN WITH CSV;
|
||||||
|
6,'6'
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- the behaviour in transaction blocks is the following:
|
||||||
|
-- (a) Unless the first query is a local query, always use distributed execution.
|
||||||
|
-- (b) If the executor has used local execution, it has to use local execution
|
||||||
|
-- for the remaining of the transaction block. If that's not possible, the
|
||||||
|
-- executor has to error out (e.g., TRUNCATE is a utility command and we
|
||||||
|
-- currently do not support local execution of utility commands)
|
||||||
|
|
||||||
|
-- rollback should be able to rollback local execution
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- make sure that the value is rollbacked
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
|
||||||
|
-- rollback should be able to rollback both the local and distributed executions
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
|
||||||
|
-- DELETE should cascade, and we should not see any rows
|
||||||
|
SELECT count(*) FROM second_distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- make sure that everything is rollbacked
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
|
||||||
|
SELECT count(*) FROM second_distributed_table;
|
||||||
|
|
||||||
|
-- very simple examples, an SELECTs should see the modifications
|
||||||
|
-- that has done before
|
||||||
|
BEGIN;
|
||||||
|
-- INSERT is executed locally
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '23' RETURNING *;
|
||||||
|
|
||||||
|
-- since the INSERT is executed locally, the SELECT should also be
|
||||||
|
-- executed locally and see the changes
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
-- multi-shard SELECTs are now forced to use local execution on
|
||||||
|
-- the shards that reside on this node
|
||||||
|
SELECT * FROM distributed_table WHERE value = '23' ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
-- similarly, multi-shard modifications should use local exection
|
||||||
|
-- on the shards that reside on this node
|
||||||
|
DELETE FROM distributed_table WHERE value = '23';
|
||||||
|
|
||||||
|
-- make sure that the value is deleted
|
||||||
|
SELECT * FROM distributed_table WHERE value = '23' ORDER BY 1,2,3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- make sure that we've committed everything
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
-- if we start with a distributed execution, we should keep
|
||||||
|
-- using that and never switch back to local execution
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE value = '11';
|
||||||
|
|
||||||
|
-- although this command could have been executed
|
||||||
|
-- locally, it is not going to be executed locally
|
||||||
|
SELECT * FROM distributed_table WHERE key = 1 ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
-- but we can still execute parallel queries, even if
|
||||||
|
-- they are utility commands
|
||||||
|
TRUNCATE distributed_table CASCADE;
|
||||||
|
|
||||||
|
-- TRUNCATE cascaded into second_distributed_table
|
||||||
|
SELECT count(*) FROM second_distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- load some data so that foreign keys won't complain with the next tests
|
||||||
|
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
|
||||||
|
|
||||||
|
-- a very similar set of operation, but this time use
|
||||||
|
-- COPY as the first command
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
|
||||||
|
|
||||||
|
-- this could go through local execution, but doesn't because we've already
|
||||||
|
-- done distributed execution
|
||||||
|
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
-- utility commands could still use distributed execution
|
||||||
|
TRUNCATE distributed_table CASCADE;
|
||||||
|
|
||||||
|
-- ensure that TRUNCATE made it
|
||||||
|
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- show that cascading foreign keys just works fine with local execution
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO reference_table VALUES (701);
|
||||||
|
INSERT INTO distributed_table VALUES (701, '701', 701);
|
||||||
|
INSERT INTO second_distributed_table VALUES (701, '701');
|
||||||
|
|
||||||
|
DELETE FROM reference_table WHERE key = 701;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 701;
|
||||||
|
SELECT count(*) FROM second_distributed_table WHERE key = 701;
|
||||||
|
|
||||||
|
-- multi-shard commands should also see the changes
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key > 700;
|
||||||
|
|
||||||
|
-- we can still do multi-shard commands
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- multiple queries hitting different shards can be executed locally
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 6;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 500;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- a local query is followed by a command that cannot be executed locally
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
TRUNCATE distributed_table CASCADE;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- a local query is followed by a command that cannot be executed locally
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
-- even no need to supply any data
|
||||||
|
COPY distributed_table FROM STDIN WITH CSV;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- a local query is followed by a command that cannot be executed locally
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- a local query is followed by a command that cannot be executed locally
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
INSERT INTO distributed_table (key) SELECT key+1 FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE key = 1;
|
||||||
|
EXPLAIN ANALYZE DELETE FROM distributed_table WHERE key = 1;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO distributed_table VALUES (11, '111',29) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
|
||||||
|
|
||||||
|
-- this is already disallowed on the nodes, adding it in case we
|
||||||
|
-- support DDLs from the worker nodes in the future
|
||||||
|
ALTER TABLE distributed_table ADD COLUMN x INT;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO distributed_table VALUES (11, '111',29) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
|
||||||
|
|
||||||
|
-- this is already disallowed because VACUUM cannot be executed in tx block
|
||||||
|
-- adding in case this is supported some day
|
||||||
|
VACUUM second_distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- make sure that functions can use local execution
|
||||||
|
CREATE OR REPLACE PROCEDURE only_local_execution() AS $$
|
||||||
|
DECLARE cnt INT;
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
|
||||||
|
SELECT count(*) INTO cnt FROM distributed_table WHERE key = 1;
|
||||||
|
DELETE FROM distributed_table WHERE key = 1;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CALL only_local_execution();
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE only_local_execution_with_params(int) AS $$
|
||||||
|
DECLARE cnt INT;
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO distributed_table VALUES ($1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
|
||||||
|
SELECT count(*) INTO cnt FROM distributed_table WHERE key = $1;
|
||||||
|
DELETE FROM distributed_table WHERE key = $1;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CALL only_local_execution_with_params(1);
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE local_execution_followed_by_dist() AS $$
|
||||||
|
DECLARE cnt INT;
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29';
|
||||||
|
SELECT count(*) INTO cnt FROM distributed_table WHERE key = 1;
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
SELECT count(*) INTO cnt FROM distributed_table;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CALL local_execution_followed_by_dist();
|
||||||
|
|
||||||
|
-- test CTEs, including modifying CTEs
|
||||||
|
WITH local_insert AS (INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *),
|
||||||
|
distributed_local_mixed AS (SELECT * FROM reference_table WHERE key IN (SELECT key FROM local_insert))
|
||||||
|
SELECT * FROM local_insert, distributed_local_mixed;
|
||||||
|
|
||||||
|
-- since we start with parallel execution, we do not switch back to local execution in the
|
||||||
|
-- latter CTEs
|
||||||
|
WITH distributed_local_mixed AS (SELECT * FROM distributed_table),
|
||||||
|
local_insert AS (INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *)
|
||||||
|
SELECT * FROM local_insert, distributed_local_mixed ORDER BY 1,2,3,4,5;
|
||||||
|
|
||||||
|
-- router CTE pushdown
|
||||||
|
WITH all_data AS (SELECT * FROM distributed_table WHERE key = 1)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
distributed_table, all_data
|
||||||
|
WHERE
|
||||||
|
distributed_table.key = all_data.key AND distributed_table.key = 1;
|
||||||
|
|
||||||
|
INSERT INTO reference_table VALUES (2);
|
||||||
|
INSERT INTO distributed_table VALUES (2, '29', 29);
|
||||||
|
INSERT INTO second_distributed_table VALUES (2, '29');
|
||||||
|
|
||||||
|
-- single shard that is not a local query followed by a local query
|
||||||
|
WITH all_data AS (SELECT * FROM second_distributed_table WHERE key = 2)
|
||||||
|
SELECT
|
||||||
|
distributed_table.key
|
||||||
|
FROM
|
||||||
|
distributed_table, all_data
|
||||||
|
WHERE
|
||||||
|
distributed_table.value = all_data.value AND distributed_table.key = 1
|
||||||
|
ORDER BY
|
||||||
|
1 DESC;
|
||||||
|
|
||||||
|
-- multi-shard CTE is followed by a query which could be executed locally, but
|
||||||
|
-- since the query started with a parallel query, it doesn't use local execution
|
||||||
|
WITH all_data AS (SELECT * FROM distributed_table)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
distributed_table, all_data
|
||||||
|
WHERE
|
||||||
|
distributed_table.key = all_data.key AND distributed_table.key = 1;
|
||||||
|
|
||||||
|
|
||||||
|
-- get ready for the next commands
|
||||||
|
TRUNCATE reference_table, distributed_table, second_distributed_table;
|
||||||
|
|
||||||
|
-- local execution of returning of reference tables
|
||||||
|
INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *;
|
||||||
|
|
||||||
|
-- local execution of multi-row INSERTs
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21), (5,'55',22) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
|
||||||
|
|
||||||
|
|
||||||
|
-- distributed execution of multi-rows INSERTs, where some part of the execution
|
||||||
|
-- could have been done via local execution but the executor choose the other way around
|
||||||
|
-- because the command is a multi-shard query
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
|
||||||
|
|
||||||
|
|
||||||
|
PREPARE local_prepare_no_param AS SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
PREPARE local_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key = $1;
|
||||||
|
PREPARE remote_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key != $1;
|
||||||
|
BEGIN;
|
||||||
|
-- 6 local execution without params
|
||||||
|
EXECUTE local_prepare_no_param;
|
||||||
|
EXECUTE local_prepare_no_param;
|
||||||
|
EXECUTE local_prepare_no_param;
|
||||||
|
EXECUTE local_prepare_no_param;
|
||||||
|
EXECUTE local_prepare_no_param;
|
||||||
|
EXECUTE local_prepare_no_param;
|
||||||
|
|
||||||
|
-- 6 local executions with params
|
||||||
|
EXECUTE local_prepare_param(1);
|
||||||
|
EXECUTE local_prepare_param(5);
|
||||||
|
EXECUTE local_prepare_param(6);
|
||||||
|
EXECUTE local_prepare_param(1);
|
||||||
|
EXECUTE local_prepare_param(5);
|
||||||
|
EXECUTE local_prepare_param(6);
|
||||||
|
|
||||||
|
-- followed by a non-local execution
|
||||||
|
EXECUTE remote_prepare_param(1);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
-- failures of local execution should rollback both the
|
||||||
|
-- local execution and remote executions
|
||||||
|
|
||||||
|
-- fail on a local execution
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '100' RETURNING *;
|
||||||
|
|
||||||
|
UPDATE distributed_table SET value = '200';
|
||||||
|
|
||||||
|
INSERT INTO distributed_table VALUES (1, '100',21) ON CONFLICT(key) DO UPDATE SET value = (1 / (100.0 - EXCLUDED.value::int))::text RETURNING *;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- we've rollbacked everything
|
||||||
|
SELECT count(*) FROM distributed_table WHERE value = '200';
|
||||||
|
|
||||||
|
-- RETURNING should just work fine for reference tables
|
||||||
|
INSERT INTO reference_table VALUES (500) RETURNING *;
|
||||||
|
DELETE FROM reference_table WHERE key = 500 RETURNING *;
|
||||||
|
|
||||||
|
-- should be able to skip local execution even if in a sequential mode of execution
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO sequential ;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '100' RETURNING *;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- sequential execution should just work fine after a local execution
|
||||||
|
BEGIN;
|
||||||
|
SET citus.multi_shard_modify_mode TO sequential ;
|
||||||
|
|
||||||
|
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '100' RETURNING *;
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
-- load some data so that foreign keys won't complain with the next tests
|
||||||
|
TRUNCATE reference_table CASCADE;
|
||||||
|
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
|
||||||
|
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
|
||||||
|
|
||||||
|
-- show that both local, and mixed local-distributed executions
|
||||||
|
-- calculate rows processed correctly
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE value != '123123213123213';
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
DELETE FROM reference_table WHERE key = 500 RETURNING *;
|
||||||
|
|
||||||
|
DELETE FROM reference_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
-- mix with other executors should fail
|
||||||
|
|
||||||
|
-- router modify execution should error
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
SET LOCAL citus.task_executor_type = 'real-time';
|
||||||
|
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- local execution should not be executed locally
|
||||||
|
-- becase a multi-shard router query has already been executed
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
SET LOCAL citus.task_executor_type = 'real-time';
|
||||||
|
|
||||||
|
DELETE FROM distributed_table;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- router select execution
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
SET LOCAL citus.task_executor_type = 'real-time';
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 500;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- local execution should not be executed locally
|
||||||
|
-- becase a single-shard router query has already been executed
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.task_executor_type = 'real-time';
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- real-time select execution
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
SET LOCAL citus.task_executor_type = 'real-time';
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- local execution should not be executed locally
|
||||||
|
-- becase a real-time query has already been executed
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.task_executor_type = 'real-time';
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- task-tracker select execution
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
SET LOCAL citus.task_executor_type = 'task-tracker';
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- local execution should not be executed locally
|
||||||
|
-- becase a task-tracker query has already been executed
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.task_executor_type = 'task-tracker';
|
||||||
|
SET LOCAL client_min_messages TO INFO;
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
SET LOCAL client_min_messages TO LOG;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- probably not a realistic case since views are not very
|
||||||
|
-- well supported with MX
|
||||||
|
CREATE VIEW v_local_query_execution AS
|
||||||
|
SELECT * FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
SELECT * FROM v_local_query_execution;
|
||||||
|
|
||||||
|
-- similar test, but this time the view itself is a non-local
|
||||||
|
-- query, but the query on the view is local
|
||||||
|
CREATE VIEW v_local_query_execution_2 AS
|
||||||
|
SELECT * FROM distributed_table;
|
||||||
|
|
||||||
|
SELECT * FROM v_local_query_execution_2 WHERE key = 500;
|
||||||
|
|
||||||
|
-- even if we switch from remote execution -> local execution,
|
||||||
|
-- we are able to use remote execution after rollback
|
||||||
|
BEGIN;
|
||||||
|
SAVEPOINT my_savepoint;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
ROLLBACK TO SAVEPOINT my_savepoint;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- even if we switch from local execution -> remote execution,
|
||||||
|
-- we are able to use local execution after rollback
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
SAVEPOINT my_savepoint;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
ROLLBACK TO SAVEPOINT my_savepoint;
|
||||||
|
|
||||||
|
DELETE FROM distributed_table WHERE key = 500;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- sanity check: local execution on partitions
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO collections_list (key, collection_id) VALUES (1,0);
|
||||||
|
SELECT count(*) FROM collections_list_0 WHERE key = 1;
|
||||||
|
SELECT count(*) FROM collections_list;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- the final queries for the following CTEs are going to happen on the intermediate results only
|
||||||
|
-- one of them will be executed remotely, and the other is locally
|
||||||
|
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
|
||||||
|
WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
SET search_path TO public;
|
||||||
|
DROP SCHEMA local_shard_execution CASCADE;
|
||||||
|
|
|
@ -19,6 +19,8 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32744;
|
||||||
|
|
||||||
-- now singe-row INSERT to the other worker
|
-- now singe-row INSERT to the other worker
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
|
\set VERBOSITY terse
|
||||||
|
|
||||||
INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
INSERT INTO limit_orders_mx VALUES (32745, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
|
||||||
20.69);
|
20.69);
|
||||||
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745;
|
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 32745;
|
||||||
|
|
|
@ -120,6 +120,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
|
||||||
|
|
||||||
-- and the other way around is also allowed
|
-- and the other way around is also allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -136,6 +137,7 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a
|
||||||
|
|
||||||
-- and the other way around is also allowed
|
-- and the other way around is also allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -146,12 +148,14 @@ COMMIT;
|
||||||
-- this logic doesn't apply to router SELECTs occurring after a modification:
|
-- this logic doesn't apply to router SELECTs occurring after a modification:
|
||||||
-- selecting from the modified node is fine...
|
-- selecting from the modified node is fine...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
-- doesn't apply to COPY after modifications
|
-- doesn't apply to COPY after modifications
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
\copy labs_mx from stdin delimiter ','
|
\copy labs_mx from stdin delimiter ','
|
||||||
10,Weyland-Yutani-1
|
10,Weyland-Yutani-1
|
||||||
|
@ -241,6 +245,7 @@ DEFERRABLE INITIALLY IMMEDIATE
|
||||||
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
|
@ -279,6 +284,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
|
|
||||||
-- should be the same story as before, just at COMMIT time
|
-- should be the same story as before, just at COMMIT time
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
|
INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
|
||||||
|
@ -297,6 +303,7 @@ DEFERRABLE INITIALLY DEFERRED
|
||||||
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
|
@ -313,6 +320,7 @@ SELECT * FROM labs_mx WHERE id = 8;
|
||||||
DROP TRIGGER reject_bad_mx ON objects_mx_1220103;
|
DROP TRIGGER reject_bad_mx ON objects_mx_1220103;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO off;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
INSERT INTO labs_mx VALUES (9, 'BAD');
|
INSERT INTO labs_mx VALUES (9, 'BAD');
|
||||||
|
|
|
@ -64,6 +64,18 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
-- Multi-statement transactions should write 2 transaction recovery records
|
-- Multi-statement transactions should write 2 transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL citus.enable_local_execution TO false;
|
||||||
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
|
INSERT INTO test_recovery VALUES ('world');
|
||||||
|
COMMIT;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
-- the same transaction block, but this time
|
||||||
|
-- enable local execution as well. The first
|
||||||
|
-- command is locally executed, the second
|
||||||
|
-- is remote, so 1 entry is expected
|
||||||
|
BEGIN;
|
||||||
INSERT INTO test_recovery VALUES ('hello');
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
INSERT INTO test_recovery VALUES ('world');
|
INSERT INTO test_recovery VALUES ('world');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
Loading…
Reference in New Issue