mirror of https://github.com/citusdata/citus.git
use localExecution in ExecuteTaskListExtended
ExecuteTaskListExtended is the common method for different codepaths, and instead of writing separate local execution logics in different codepaths, it makes more sense to have the logic here. We still need to do some refactoring, this is an initial step. After this commit, we can run create shard commands locally. There is a special case with shard creation commands. A create shard command might have a concatenated query string, however local execution did not know how to execute a task with multiple query strings. This is also implemented in this commit. We go over each query in the concatenated query string and plan/execute them one by one. A more clean solution to this would be to make sure that each task has a single query. We currently cannot do that because we need to ensure the task dependencies. However, it would make sense to do that at some point and it would simplify the code a lot.enh/localExecuteSelectInto
parent
dfcf1d07b2
commit
4dc006d52a
|
@ -189,7 +189,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
||||||
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
|
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
|
||||||
tupleDesc, tupleStore, hasReturning,
|
tupleDesc, tupleStore, hasReturning,
|
||||||
MaxAdaptiveExecutorPoolSize,
|
MaxAdaptiveExecutorPoolSize,
|
||||||
&xactProperties, NIL);
|
&xactProperties, NIL, true);
|
||||||
|
|
||||||
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
|
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
|
||||||
{
|
{
|
||||||
|
|
|
@ -861,7 +861,8 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupporte
|
||||||
/* execute remote tasks if any */
|
/* execute remote tasks if any */
|
||||||
if (list_length(remoteTaskList) > 0)
|
if (list_length(remoteTaskList) > 0)
|
||||||
{
|
{
|
||||||
ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize);
|
ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -884,7 +885,7 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
|
||||||
|
|
||||||
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
||||||
tupleStore, hasReturning, targetPoolSize,
|
tupleStore, hasReturning, targetPoolSize,
|
||||||
&xactProperties, jobIdList);
|
&xactProperties, jobIdList, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -893,7 +894,8 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
|
||||||
* for some of the arguments.
|
* for some of the arguments.
|
||||||
*/
|
*/
|
||||||
uint64
|
uint64
|
||||||
ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
|
ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool
|
||||||
|
localExecutionSupported)
|
||||||
{
|
{
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
Tuplestorestate *tupleStore = NULL;
|
Tuplestorestate *tupleStore = NULL;
|
||||||
|
@ -904,7 +906,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
|
||||||
|
|
||||||
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
||||||
tupleStore, hasReturning, targetPoolSize,
|
tupleStore, hasReturning, targetPoolSize,
|
||||||
&xactProperties, NIL);
|
&xactProperties, NIL, localExecutionSupported);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -924,7 +926,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
||||||
|
|
||||||
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
||||||
tupleStore, hasReturning, targetPoolSize,
|
tupleStore, hasReturning, targetPoolSize,
|
||||||
&xactProperties, NIL);
|
&xactProperties, NIL, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -936,9 +938,30 @@ uint64
|
||||||
ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
|
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
|
||||||
bool hasReturning, int targetPoolSize,
|
bool hasReturning, int targetPoolSize,
|
||||||
TransactionProperties *xactProperties, List *jobIdList)
|
TransactionProperties *xactProperties,
|
||||||
|
List *jobIdList,
|
||||||
|
bool localExecutionSupported)
|
||||||
{
|
{
|
||||||
ParamListInfo paramListInfo = NULL;
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
uint64 locallyProcessedRows = 0;
|
||||||
|
List *localTaskList = NIL;
|
||||||
|
List *remoteTaskList = NIL;
|
||||||
|
|
||||||
|
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
|
||||||
|
{
|
||||||
|
bool readOnlyPlan = false;
|
||||||
|
|
||||||
|
/* set local (if any) & remote tasks */
|
||||||
|
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
|
||||||
|
&remoteTaskList);
|
||||||
|
locallyProcessedRows += ExecuteLocalTaskList(localTaskList, NULL,
|
||||||
|
NULL,
|
||||||
|
tupleStore);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
remoteTaskList = taskList;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If current transaction accessed local placements and task list includes
|
* If current transaction accessed local placements and task list includes
|
||||||
|
@ -946,7 +969,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
* then we should error out as it would cause inconsistencies across the
|
* then we should error out as it would cause inconsistencies across the
|
||||||
* remote connection and local execution.
|
* remote connection and local execution.
|
||||||
*/
|
*/
|
||||||
if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList))
|
if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList))
|
||||||
{
|
{
|
||||||
ErrorIfTransactionAccessedPlacementsLocally();
|
ErrorIfTransactionAccessedPlacementsLocally();
|
||||||
}
|
}
|
||||||
|
@ -957,7 +980,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
}
|
}
|
||||||
|
|
||||||
DistributedExecution *execution =
|
DistributedExecution *execution =
|
||||||
CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
|
CreateDistributedExecution(modLevel, remoteTaskList, hasReturning, paramListInfo,
|
||||||
tupleDescriptor, tupleStore, targetPoolSize,
|
tupleDescriptor, tupleStore, targetPoolSize,
|
||||||
xactProperties, jobIdList);
|
xactProperties, jobIdList);
|
||||||
|
|
||||||
|
@ -965,7 +988,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
RunDistributedExecution(execution);
|
RunDistributedExecution(execution);
|
||||||
FinishDistributedExecution(execution);
|
FinishDistributedExecution(execution);
|
||||||
|
|
||||||
return execution->rowsProcessed;
|
return execution->rowsProcessed + locallyProcessedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -553,7 +553,7 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
|
||||||
PlannedStmt *
|
PlannedStmt *
|
||||||
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
|
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
|
||||||
{
|
{
|
||||||
if (distributedPlan->workerJob == NULL)
|
if (distributedPlan == NULL || distributedPlan->workerJob == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -413,7 +413,7 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||||
|
|
||||||
ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor,
|
ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor,
|
||||||
resultStore, hasReturning, targetPoolSize, &xactProperties,
|
resultStore, hasReturning, targetPoolSize, &xactProperties,
|
||||||
NIL);
|
NIL, false);
|
||||||
|
|
||||||
return resultStore;
|
return resultStore;
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,10 +270,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
|
|
||||||
scanState->tuplestorestate =
|
scanState->tuplestorestate =
|
||||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||||
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState,
|
uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE,
|
||||||
taskList,
|
taskList, tupleDescriptor,
|
||||||
ROW_MODIFY_COMMUTATIVE,
|
scanState->tuplestorestate,
|
||||||
hasReturning);
|
hasReturning);
|
||||||
|
|
||||||
executorState->es_processed = rowsInserted;
|
executorState->es_processed = rowsInserted;
|
||||||
|
@ -331,8 +331,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
Assert(scanState->tuplestorestate == NULL);
|
Assert(scanState->tuplestorestate == NULL);
|
||||||
scanState->tuplestorestate =
|
scanState->tuplestorestate =
|
||||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||||
ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList,
|
|
||||||
ROW_MODIFY_COMMUTATIVE,
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
|
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
|
||||||
|
tupleDescriptor, scanState->tuplestorestate,
|
||||||
hasReturning);
|
hasReturning);
|
||||||
|
|
||||||
if (SortReturning && hasReturning)
|
if (SortReturning && hasReturning)
|
||||||
|
|
|
@ -79,6 +79,7 @@
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
|
#include "distributed/query_utils.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/local_executor.h"
|
#include "distributed/local_executor.h"
|
||||||
|
@ -113,6 +114,8 @@ static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
|
||||||
Tuplestorestate *tupleStoreState, ParamListInfo
|
Tuplestorestate *tupleStoreState, ParamListInfo
|
||||||
paramListInfo);
|
paramListInfo);
|
||||||
static void LogLocalCommand(Task *task);
|
static void LogLocalCommand(Task *task);
|
||||||
|
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
|
||||||
|
Tuplestorestate *tupleStoreState);
|
||||||
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
|
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
|
@ -197,11 +200,20 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo,
|
||||||
taskNumParams = 0;
|
taskNumParams = 0;
|
||||||
taskParameterTypes = NULL;
|
taskParameterTypes = NULL;
|
||||||
}
|
}
|
||||||
|
List *queryStrings = SplitIntoQueries(TaskQueryString(task));
|
||||||
|
if (list_length(queryStrings) > 1)
|
||||||
|
{
|
||||||
|
LogLocalCommand(task);
|
||||||
|
totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(queryStrings,
|
||||||
|
tupleStoreState);
|
||||||
|
return totalRowsProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
Query *shardQuery = ParseQueryString(linitial(queryStrings),
|
||||||
taskParameterTypes,
|
taskParameterTypes,
|
||||||
taskNumParams);
|
taskNumParams);
|
||||||
|
|
||||||
|
|
||||||
int cursorOptions = 0;
|
int cursorOptions = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -230,48 +242,28 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
static uint64
|
||||||
* ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks
|
LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState)
|
||||||
* if local execution can be used and executes them.
|
|
||||||
*/
|
|
||||||
uint64
|
|
||||||
ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
|
|
||||||
List *taskList, RowModifyLevel rowModifyLevel, bool
|
|
||||||
hasReturning)
|
|
||||||
{
|
{
|
||||||
uint64 processedRows = 0;
|
char *queryString = NULL;
|
||||||
List *localTaskList = NIL;
|
uint64 totalProcessedRows = 0;
|
||||||
List *remoteTaskList = NIL;
|
if (tupleStoreState == NULL)
|
||||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
|
||||||
|
|
||||||
if (ShouldExecuteTasksLocally(taskList))
|
|
||||||
{
|
{
|
||||||
bool readOnlyPlan = false;
|
tupleStoreState = tuplestore_begin_heap(true, false, work_mem);
|
||||||
|
|
||||||
/* set local (if any) & remote tasks */
|
|
||||||
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
|
|
||||||
&remoteTaskList);
|
|
||||||
EState *estate = ScanStateGetExecutorState(scanState);
|
|
||||||
processedRows += ExecuteLocalTaskList(localTaskList, estate->es_param_list_info,
|
|
||||||
scanState->distributedPlan,
|
|
||||||
scanState->tuplestorestate);
|
|
||||||
}
|
}
|
||||||
else
|
foreach_ptr(queryString, queryStrings)
|
||||||
{
|
{
|
||||||
/* all tasks should be executed via remote connections */
|
Query *shardQuery = ParseQueryString(queryString,
|
||||||
remoteTaskList = taskList;
|
NULL,
|
||||||
|
0);
|
||||||
|
int cursorOptions = 0;
|
||||||
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo);
|
||||||
|
totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString,
|
||||||
|
tupleStoreState,
|
||||||
|
paramListInfo);
|
||||||
}
|
}
|
||||||
|
return totalProcessedRows;
|
||||||
/* execute remote tasks if any */
|
|
||||||
if (list_length(remoteTaskList) > 0)
|
|
||||||
{
|
|
||||||
processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList,
|
|
||||||
tupleDescriptor,
|
|
||||||
scanState->tuplestorestate,
|
|
||||||
hasReturning);
|
|
||||||
}
|
|
||||||
|
|
||||||
return processedRows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -546,7 +546,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
||||||
poolSize = MaxAdaptiveExecutorPoolSize;
|
poolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize);
|
ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -132,6 +132,31 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SplitIntoQueries returns a list of queries by splitting
|
||||||
|
* the given concatenated query string with delimiter ';'
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
SplitIntoQueries(char *concatenatedQueryString)
|
||||||
|
{
|
||||||
|
List *queries = NIL;
|
||||||
|
rsize_t len = (rsize_t) strlen(concatenatedQueryString);
|
||||||
|
char *delimiter = ";";
|
||||||
|
char *remaining = concatenatedQueryString;
|
||||||
|
char *query = strtok_s(concatenatedQueryString, &len, delimiter, &remaining);
|
||||||
|
while (query != NULL)
|
||||||
|
{
|
||||||
|
queries = lappend(queries, query);
|
||||||
|
if (len == 0)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
query = strtok_s(NULL, &len, delimiter, &remaining);
|
||||||
|
}
|
||||||
|
return queries;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractRangeTableEntryWalker walks over a query tree, and finds all range
|
* ExtractRangeTableEntryWalker walks over a query tree, and finds all range
|
||||||
* table entries. For recursing into the query tree, this function uses the
|
* table entries. For recursing into the query tree, this function uses the
|
||||||
|
|
|
@ -11,7 +11,7 @@ extern int MaxAdaptiveExecutorPoolSize;
|
||||||
extern int ExecutorSlowStartInterval;
|
extern int ExecutorSlowStartInterval;
|
||||||
|
|
||||||
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
|
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
|
||||||
int targetPoolSize);
|
int targetPoolSize, bool localExecutionSupported);
|
||||||
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
|
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
|
||||||
int targetPoolSize, List *jobIdList);
|
int targetPoolSize, List *jobIdList);
|
||||||
|
|
||||||
|
|
|
@ -21,9 +21,6 @@ extern bool TransactionAccessedLocalPlacement;
|
||||||
extern bool TransactionConnectedToLocalGroup;
|
extern bool TransactionConnectedToLocalGroup;
|
||||||
|
|
||||||
/* extern function declarations */
|
/* extern function declarations */
|
||||||
extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
|
|
||||||
List *taskList, RowModifyLevel
|
|
||||||
rowModifyLevel, bool hasReturning);
|
|
||||||
extern uint64 ExecuteLocalTaskList(List *taskList, ParamListInfo paramListInfo,
|
extern uint64 ExecuteLocalTaskList(List *taskList, ParamListInfo paramListInfo,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
Tuplestorestate *tupleStoreState);
|
Tuplestorestate *tupleStoreState);
|
||||||
|
|
|
@ -79,15 +79,14 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
Tuplestorestate *tupleStore,
|
Tuplestorestate *tupleStore,
|
||||||
bool hasReturning, int targetPoolSize,
|
bool hasReturning, int targetPoolSize,
|
||||||
TransactionProperties *xactProperties,
|
TransactionProperties *xactProperties,
|
||||||
List *jobIdList);
|
List *jobIdList,
|
||||||
|
bool localExecutionSupported);
|
||||||
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
Tuplestorestate *tupleStore,
|
Tuplestorestate *tupleStore,
|
||||||
bool hasReturning);
|
bool hasReturning);
|
||||||
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool
|
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool
|
||||||
localExecutionSupported);
|
localExecutionSupported);
|
||||||
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
|
|
||||||
targetPoolSize);
|
|
||||||
extern bool IsCitusCustomState(PlanState *planState);
|
extern bool IsCitusCustomState(PlanState *planState);
|
||||||
extern TupleTableSlot * CitusExecScan(CustomScanState *node);
|
extern TupleTableSlot * CitusExecScan(CustomScanState *node);
|
||||||
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
||||||
|
|
|
@ -34,6 +34,7 @@ extern bool ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *co
|
||||||
/* Below two functions wrap ExtractRangeTableList function to determine the execution flow */
|
/* Below two functions wrap ExtractRangeTableList function to determine the execution flow */
|
||||||
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
|
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
|
||||||
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
|
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
|
||||||
|
extern List * SplitIntoQueries(char *concatenatedQueryString);
|
||||||
|
|
||||||
extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList);
|
extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList);
|
||||||
|
|
||||||
|
|
|
@ -489,6 +489,38 @@ BEGIN;
|
||||||
COPY reference_table FROM STDIN;
|
COPY reference_table FROM STDIN;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
SET citus.enable_local_execution = 'on';
|
SET citus.enable_local_execution = 'on';
|
||||||
|
CREATE TABLE ref_table(a int);
|
||||||
|
INSERT INTO ref_table VALUES(1);
|
||||||
|
BEGIN;
|
||||||
|
-- trigger local execution
|
||||||
|
SELECT COUNT(*) FROM reference_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- shard creation should be done locally
|
||||||
|
SELECT create_reference_table('ref_table');
|
||||||
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)')
|
||||||
|
NOTICE: executing the copy locally for shard xxxxx
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO ref_table VALUES(2);
|
||||||
|
NOTICE: executing the command locally: INSERT INTO local_shard_copy.ref_table_1330000 (a) VALUES (2)
|
||||||
|
-- verify that it worked.
|
||||||
|
SELECT COUNT(*) FROM ref_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.ref_table_1330000 ref_table
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
SET search_path TO public;
|
SET search_path TO public;
|
||||||
DROP SCHEMA local_shard_copy CASCADE;
|
DROP SCHEMA local_shard_copy CASCADE;
|
||||||
|
|
|
@ -565,7 +565,11 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
|
||||||
-- would error out
|
-- would error out
|
||||||
CREATE TABLE another_dist_table(a int);
|
CREATE TABLE another_dist_table(a int);
|
||||||
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
|
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
|
||||||
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
------------ partitioned tables -------------
|
------------ partitioned tables -------------
|
||||||
|
@ -721,7 +725,7 @@ TRUNCATE partitioning_test;
|
||||||
DROP TABLE partitioning_test;
|
DROP TABLE partitioning_test;
|
||||||
-- cleanup at exit
|
-- cleanup at exit
|
||||||
DROP SCHEMA local_commands_test_schema CASCADE;
|
DROP SCHEMA local_commands_test_schema CASCADE;
|
||||||
NOTICE: drop cascades to 16 other objects
|
NOTICE: drop cascades to 18 other objects
|
||||||
DROP SCHEMA foo_schema;
|
DROP SCHEMA foo_schema;
|
||||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||||
?column?
|
?column?
|
||||||
|
|
|
@ -341,6 +341,20 @@ ROLLBACK;
|
||||||
|
|
||||||
SET citus.enable_local_execution = 'on';
|
SET citus.enable_local_execution = 'on';
|
||||||
|
|
||||||
|
CREATE TABLE ref_table(a int);
|
||||||
|
INSERT INTO ref_table VALUES(1);
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- trigger local execution
|
||||||
|
SELECT COUNT(*) FROM reference_table;
|
||||||
|
-- shard creation should be done locally
|
||||||
|
SELECT create_reference_table('ref_table');
|
||||||
|
INSERT INTO ref_table VALUES(2);
|
||||||
|
|
||||||
|
-- verify that it worked.
|
||||||
|
SELECT COUNT(*) FROM ref_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
SET search_path TO public;
|
SET search_path TO public;
|
||||||
DROP SCHEMA local_shard_copy CASCADE;
|
DROP SCHEMA local_shard_copy CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue