Pass arbitrary ddl commands through adaptive executor

hack-local-execution
Onur Tirtir 2021-01-21 11:22:11 +03:00
parent 9b39b25390
commit 93dac559b2
4 changed files with 16 additions and 39 deletions

View File

@ -21,10 +21,12 @@
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/foreign_key_relationship.h" #include "distributed/foreign_key_relationship.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -423,11 +425,16 @@ ExecuteAndLogDDLCommandList(List *ddlCommandList)
void void
ExecuteAndLogDDLCommand(const char *commandString) ExecuteAndLogDDLCommand(const char *commandString)
{ {
ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
Node *parseTree = ParseTreeNode(commandString); Task *task = CitusMakeNode(Task);
ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, task->taskType = DDL_TASK;
NULL, None_Receiver, NULL); SetTaskQueryString(task, pstrdup(commandString));
task->replicationModel = REPLICATION_MODEL_INVALID;
task->taskPlacementList = list_make1(CreateLocalDummyPlacement());
ddlJob->taskList = list_make1(task);
ExecuteDistributedDDLJob(ddlJob);
} }
@ -456,28 +463,6 @@ ExecuteForeignKeyCreateCommandList(List *ddlCommandList, bool skip_validation)
static void static void
ExecuteForeignKeyCreateCommand(const char *commandString, bool skip_validation) ExecuteForeignKeyCreateCommand(const char *commandString, bool skip_validation)
{ {
ereport(DEBUG4, (errmsg("executing foreign key create command \"%s\"", /* TODO: find a way pass skip_validation to executor */
commandString))); ExecuteAndLogDDLCommand(commandString);
Node *parseTree = ParseTreeNode(commandString);
/*
* We might have thrown an error if IsA(parseTree, AlterTableStmt),
* but that doesn't seem to provide any benefits, so assertion is
* fine for this case.
*/
Assert(IsA(parseTree, AlterTableStmt));
if (skip_validation && IsA(parseTree, AlterTableStmt))
{
parseTree =
SkipForeignKeyValidationIfConstraintIsFkey((AlterTableStmt *) parseTree,
true);
ereport(DEBUG4, (errmsg("skipping validation for foreign key create "
"command \"%s\"", commandString)));
}
ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
} }

View File

@ -114,14 +114,6 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
/* enable create_citus_local_table on an empty node */ /* enable create_citus_local_table on an empty node */
InsertCoordinatorIfClusterEmpty(); InsertCoordinatorIfClusterEmpty();
/*
* Creating Citus local tables relies on functions that accesses
* shards locally (e.g., ExecuteAndLogDDLCommand()). As long as
* we don't teach those functions to access shards remotely, we
* cannot relax this check.
*/
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
/* /*
* Lock target relation with an AccessExclusiveLock as we don't want * Lock target relation with an AccessExclusiveLock as we don't want
* multiple backends manipulating this relation. We could actually simply * multiple backends manipulating this relation. We could actually simply

View File

@ -165,7 +165,6 @@ static DeferredErrorMessage * DeferErrorIfUnsupportedRouterPlannableSelectQuery(
static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree); static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree);
static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); static bool SelectsFromDistributedTable(List *rangeTableList, Query *query);
static ShardPlacement * CreateDummyPlacement(bool hasLocalRelation); static ShardPlacement * CreateDummyPlacement(bool hasLocalRelation);
static ShardPlacement * CreateLocalDummyPlacement();
static int CompareInsertValuesByShardId(const void *leftElement, static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement); const void *rightElement);
static List * SingleShardTaskList(Query *query, uint64 jobId, static List * SingleShardTaskList(Query *query, uint64 jobId,
@ -2428,8 +2427,8 @@ CreateTaskPlacementListForShardIntervals(List *shardIntervalListList, bool shard
* (a) queries that consist of only intermediate results * (a) queries that consist of only intermediate results
* (b) queries that hit zero shards (... WHERE false;) * (b) queries that hit zero shards (... WHERE false;)
*/ */
static ShardPlacement * ShardPlacement *
CreateLocalDummyPlacement() CreateLocalDummyPlacement(void)
{ {
ShardPlacement *dummyPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *dummyPlacement = CitusMakeNode(ShardPlacement);
dummyPlacement->nodeId = LOCAL_NODE_ID; dummyPlacement->nodeId = LOCAL_NODE_ID;

View File

@ -50,6 +50,7 @@ extern List * CreateTaskPlacementListForShardIntervals(List *shardIntervalList,
bool shardsPresent, bool shardsPresent,
bool generateDummyPlacement, bool generateDummyPlacement,
bool hasLocalRelation); bool hasLocalRelation);
extern ShardPlacement * CreateLocalDummyPlacement(void);
extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved, extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
extern Const * ExtractInsertPartitionKeyValue(Query *query); extern Const * ExtractInsertPartitionKeyValue(Query *query);