From 0ea4e52df512acd678ed7ad0cb128638221a3a5a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 7 Mar 2019 11:42:03 +0100 Subject: [PATCH 1/4] Add nodeId to shardPlacements and use it for shard placement comparisons Before this commit, shardPlacements were identified with shardId, nodeName and nodeport. Instead of using nodeName and nodePort, we now use nodeId since it apparently has performance benefits in several places in the code. --- src/backend/distributed/commands/multi_copy.c | 7 +++++++ .../connection/placement_connection.c | 12 ++++-------- .../planner/multi_physical_planner.c | 4 ++-- .../distributed/planner/multi_router_planner.c | 1 + src/backend/distributed/utils/citus_copyfuncs.c | 1 + src/backend/distributed/utils/citus_outfuncs.c | 1 + src/backend/distributed/utils/citus_readfuncs.c | 1 + .../distributed/utils/colocation_utils.c | 17 ++--------------- src/backend/distributed/utils/metadata_cache.c | 1 + .../distributed/master_metadata_utility.h | 1 + 10 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f22878f6a..a96d87529 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1051,6 +1051,13 @@ RemoteFinalizedShardPlacementList(uint64 shardId) shardPlacement->nodeName = nodeName; shardPlacement->nodePort = nodePort; + /* + * We cannot know the nodeId, but it is not necessary at this point either. + * This is only used to to look up the connection for a group of co-located + * placements, but append-distributed tables are never co-located. + */ + shardPlacement->nodeId = -1; + finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement); } } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 15e7b7551..20303e58b 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -133,8 +133,7 @@ static HTAB *ConnectionPlacementHash; typedef struct ColocatedPlacementsHashKey { /* to identify host - database can't differ */ - char nodeName[MAX_NODE_LENGTH]; - uint32 nodePort; + int nodeId; /* colocation group, or invalid */ uint32 colocationGroupId; @@ -689,8 +688,7 @@ FindOrCreatePlacementEntry(ShardPlacement *placement) ColocatedPlacementsHashKey key; ColocatedPlacementsHashEntry *colocatedEntry = NULL; - strlcpy(key.nodeName, placement->nodeName, MAX_NODE_LENGTH); - key.nodePort = placement->nodePort; + key.nodeId = placement->nodeId; key.colocationGroupId = placement->colocationGroupId; key.representativeValue = placement->representativeValue; @@ -1149,8 +1147,7 @@ ColocatedPlacementsHashHash(const void *key, Size keysize) ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key; uint32 hash = 0; - hash = string_hash(entry->nodeName, NAMEDATALEN); - hash = hash_combine(hash, hash_uint32(entry->nodePort)); + hash = hash_uint32(entry->nodeId); hash = hash_combine(hash, hash_uint32(entry->colocationGroupId)); hash = hash_combine(hash, hash_uint32(entry->representativeValue)); @@ -1164,8 +1161,7 @@ ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize) ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a; ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b; - if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 || - ca->nodePort != cb->nodePort || + if (ca->nodeId != cb->nodeId || ca->colocationGroupId != cb->colocationGroupId || ca->representativeValue != cb->representativeValue) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 8e359239f..a12572ad5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2552,8 +2552,7 @@ CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterv ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst( secondShardPlacementCell); - if (strcmp(firstShardPlacement->nodeName, secondShardPlacement->nodeName) != 0 || - firstShardPlacement->nodePort != secondShardPlacement->nodePort) + if (firstShardPlacement->nodeId != secondShardPlacement->nodeId) { return false; } @@ -5375,6 +5374,7 @@ AssignDualHashTaskList(List *taskList) ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); taskPlacement->nodeName = pstrdup(workerNode->workerName); taskPlacement->nodePort = workerNode->workerPort; + taskPlacement->nodeId = workerNode->nodeId; taskPlacementList = lappend(taskPlacementList, taskPlacement); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 8694e44ed..854c40309 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2027,6 +2027,7 @@ PlanRouterQuery(Query *originalQuery, (ShardPlacement *) CitusMakeNode(ShardPlacement); dummyPlacement->nodeName = workerNode->workerName; dummyPlacement->nodePort = workerNode->workerPort; + dummyPlacement->nodeId = workerNode->nodeId; dummyPlacement->groupId = workerNode->groupId; workerList = lappend(workerList, dummyPlacement); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 757317851..6d302e441 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -200,6 +200,7 @@ CopyNodeShardPlacement(COPYFUNC_ARGS) COPY_SCALAR_FIELD(groupId); COPY_STRING_FIELD(nodeName); COPY_SCALAR_FIELD(nodePort); + COPY_SCALAR_FIELD(nodeId); COPY_SCALAR_FIELD(partitionMethod); COPY_SCALAR_FIELD(colocationGroupId); COPY_SCALAR_FIELD(representativeValue); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index ca8f77d67..94caafa0e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -405,6 +405,7 @@ OutShardPlacement(OUTFUNC_ARGS) WRITE_INT_FIELD(groupId); WRITE_STRING_FIELD(nodeName); WRITE_UINT_FIELD(nodePort); + WRITE_INT_FIELD(nodeId); /* so we can deal with 0 */ WRITE_INT_FIELD(partitionMethod); WRITE_UINT_FIELD(colocationGroupId); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index cbd573142..93df55fab 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -315,6 +315,7 @@ ReadShardPlacement(READFUNC_ARGS) READ_INT_FIELD(groupId); READ_STRING_FIELD(nodeName); READ_UINT_FIELD(nodePort); + READ_INT_FIELD(nodeId); /* so we can deal with 0 */ READ_INT_FIELD(partitionMethod); READ_UINT_FIELD(colocationGroupId); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index ff273a99c..89e3cebb7 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -420,25 +420,12 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement) const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement); const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement); - char *leftNodeName = leftPlacement->nodeName; - char *rightNodeName = rightPlacement->nodeName; - - uint32 leftNodePort = leftPlacement->nodePort; - uint32 rightNodePort = rightPlacement->nodePort; - - /* first compare node names */ - int nodeNameCompare = strncmp(leftNodeName, rightNodeName, WORKER_LENGTH); - if (nodeNameCompare != 0) - { - return nodeNameCompare; - } - /* if node names are same, check node ports */ - if (leftNodePort < rightNodePort) + if (leftPlacement->nodeId < rightPlacement->nodeId) { return -1; } - else if (leftNodePort > rightNodePort) + else if (leftPlacement->nodeId > rightPlacement->nodeId) { return 1; } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index f1e0e1f9a..1c397daab 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -526,6 +526,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, shardPlacement->nodeName = pstrdup(workerNode->workerName); shardPlacement->nodePort = workerNode->workerPort; + shardPlacement->nodeId = workerNode->nodeId; /* fill in remaining fields */ Assert(tableEntry->partitionMethod != 0); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 3fba2b449..093b0697b 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -99,6 +99,7 @@ typedef struct ShardPlacement /* the rest of the fields aren't from pg_dist_placement */ char *nodeName; uint32 nodePort; + int nodeId; char partitionMethod; uint32 colocationGroupId; uint32 representativeValue; From 5ff1821411690cb1922af3d6131ab330f4bcd967 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 4 Mar 2019 13:38:15 +0100 Subject: [PATCH 2/4] Cache the current database name Purely for performance reasons. --- .../connection/connection_management.c | 2 +- .../executor/multi_task_tracker_executor.c | 2 +- .../distributed/planner/multi_explain.c | 2 +- .../distributed/utils/metadata_cache.c | 30 +++++++++++++++++++ .../worker/task_tracker_protocol.c | 2 +- .../worker/worker_data_fetch_protocol.c | 2 +- src/include/distributed/metadata_cache.h | 3 ++ 7 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index c8c509a70..f95f5f8ab 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -251,7 +251,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } else { - strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN); + strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); } if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index ae4dcd682..ae175f510 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -871,7 +871,7 @@ TrackerConnectPoll(TaskTracker *taskTracker) { char *nodeName = taskTracker->workerName; uint32 nodePort = taskTracker->workerPort; - char *nodeDatabase = get_database_name(MyDatabaseId); + char *nodeDatabase = CurrentDatabaseName(); char *nodeUser = taskTracker->userName; int32 connectionId = MultiClientConnectStart(nodeName, nodePort, diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 8b92b8578..5d4036eb0 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -507,7 +507,7 @@ ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList, StringInfo nodeAddress = makeStringInfo(); char *nodeName = taskPlacement->nodeName; uint32 nodePort = taskPlacement->nodePort; - char *nodeDatabase = get_database_name(MyDatabaseId); + char *nodeDatabase = CurrentDatabaseName(); ListCell *explainOutputCell = NULL; int rowIndex = 0; diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 1c397daab..14159a3ae 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -24,6 +24,7 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "citus_version.h" +#include "commands/dbcommands.h" #include "commands/extension.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" @@ -137,6 +138,8 @@ typedef struct MetadataCacheData Oid unavailableNodeRoleId; Oid pgTableIsVisibleFuncId; Oid citusTableIsVisibleFuncId; + bool databaseNameValid; + char databaseName[NAMEDATALEN]; } MetadataCacheData; @@ -2102,6 +2105,33 @@ CitusTableVisibleFuncId(void) } +/* + * CurrentDatabaseName gets the name of the current database and caches + * the result. + * + * Given that the database name cannot be changed when there is at least + * one session connected to it, we do not need to implement any invalidation + * mechanism. + */ +char * +CurrentDatabaseName(void) +{ + if (!MetadataCache.databaseNameValid) + { + char *databaseName = get_database_name(MyDatabaseId); + if (databaseName == NULL) + { + return NULL; + } + + strlcpy(MetadataCache.databaseName, databaseName, NAMEDATALEN); + MetadataCache.databaseNameValid = true; + } + + return MetadataCache.databaseName; +} + + /* * CitusExtensionOwner() returns the owner of the 'citus' extension. That user * is, amongst others, used to perform actions a normal user might not be diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 380325a89..d448a2c6a 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -368,7 +368,7 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString) { WorkerTask *workerTask = NULL; uint32 assignmentTime = 0; - char *databaseName = get_database_name(MyDatabaseId); + char *databaseName = CurrentDatabaseName(); char *userName = CurrentUserName(); /* increase task priority for cleanup tasks */ diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 82c50a27c..5d5719aa3 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -248,7 +248,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser, } /* we use the same database name on the master and worker nodes */ - nodeDatabase = get_database_name(MyDatabaseId); + nodeDatabase = CurrentDatabaseName(); /* connect to remote node */ connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 6ade86ed8..dfe2bed5e 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -167,4 +167,7 @@ extern Oid BinaryCopyFormatId(void); extern Oid CitusExtensionOwner(void); extern char * CitusExtensionOwnerName(void); extern char * CurrentUserName(void); +extern char * CurrentDatabaseName(void); + + #endif /* METADATA_CACHE_H */ From ee6a0b69430440cc138557cbaf8315fd51138d27 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 4 Mar 2019 13:45:35 +0100 Subject: [PATCH 3/4] Speed up RTE walkers Do it in two ways (a) re-use the rte list as much as possible instead of re-calculating over and over again (b) Limit the recursion to the relevant parts of the query tree --- .../distributed/planner/distributed_planner.c | 108 ++++++++++-------- .../planner/multi_logical_planner.c | 27 ++++- .../planner/query_pushdown_planning.c | 1 + src/include/distributed/distributed_planner.h | 1 + 4 files changed, 85 insertions(+), 52 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index b49be5790..29d49bd1e 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -51,7 +51,7 @@ static uint64 NextPlanId = 1; /* local function forward declarations */ -static bool NeedsDistributedPlanningWalker(Node *node, void *context); +static bool ListContainsDistributedTableRTE(List *rangeTableList); static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, @@ -65,9 +65,9 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); -static void AssignRTEIdentities(Query *queryTree); +static void AssignRTEIdentities(List *rangeTableList); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); -static void AdjustPartitioningForDistributedPlanning(Query *parse, +static void AdjustPartitioningForDistributedPlanning(List *rangeTableList, bool setPartitionedTablesInherited); static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan); @@ -93,15 +93,23 @@ PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result = NULL; - bool needsDistributedPlanning = NeedsDistributedPlanning(parse); + bool needsDistributedPlanning = false; Query *originalQuery = NULL; PlannerRestrictionContext *plannerRestrictionContext = NULL; bool setPartitionedTablesInherited = false; + List *rangeTableList = ExtractRangeTableEntryList(parse); if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { + /* this cursor flag could only be set when Citus has been loaded */ + Assert(CitusHasBeenLoaded()); + needsDistributedPlanning = true; } + else if (CitusHasBeenLoaded()) + { + needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + } if (needsDistributedPlanning) { @@ -127,11 +135,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * of the query tree. Note that we copy the query tree once we're sure it's a * distributed query. */ - AssignRTEIdentities(parse); + AssignRTEIdentities(rangeTableList); originalQuery = copyObject(parse); setPartitionedTablesInherited = false; - AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited); + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); } /* @@ -171,7 +180,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) boundParams, plannerRestrictionContext); setPartitionedTablesInherited = true; - AdjustPartitioningForDistributedPlanning(parse, + AdjustPartitioningForDistributedPlanning(rangeTableList, setPartitionedTablesInherited); } } @@ -205,6 +214,22 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } +/* + * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker. + * The function traverses the input query and returns all the range table + * entries that are in the query tree. + */ +List * +ExtractRangeTableEntryList(Query *query) +{ + List *rangeTblList = NIL; + + ExtractRangeTableEntryWalker((Node *) query, &rangeTblList); + + return rangeTblList; +} + + /* * NeedsDistributedPlanning returns true if the Citus extension is loaded and * the query contains a distributed table. @@ -216,61 +241,52 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) bool NeedsDistributedPlanning(Query *query) { + List *allRTEs = NIL; CmdType commandType = query->commandType; - if (commandType != CMD_SELECT && commandType != CMD_INSERT && - commandType != CMD_UPDATE && commandType != CMD_DELETE) - { - return false; - } if (!CitusHasBeenLoaded()) { return false; } - if (!NeedsDistributedPlanningWalker((Node *) query, NULL)) + if (commandType != CMD_SELECT && commandType != CMD_INSERT && + commandType != CMD_UPDATE && commandType != CMD_DELETE) { return false; } - return true; + ExtractRangeTableEntryWalker((Node *) query, &allRTEs); + + return ListContainsDistributedTableRTE(allRTEs); } /* - * NeedsDistributedPlanningWalker checks if the query contains any distributed - * tables. + * ListContainsDistributedTableRTE gets a list of range table entries + * and returns true if there is at least one distributed relation range + * table entry in the list. */ static bool -NeedsDistributedPlanningWalker(Node *node, void *context) +ListContainsDistributedTableRTE(List *rangeTableList) { - if (node == NULL) - { - return false; - } + ListCell *rangeTableCell = NULL; - if (IsA(node, Query)) + foreach(rangeTableCell, rangeTableList) { - Query *query = (Query *) node; - ListCell *rangeTableCell = NULL; + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - foreach(rangeTableCell, query->rtable) + if (rangeTableEntry->rtekind != RTE_RELATION) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - Oid relationId = rangeTableEntry->relid; - if (IsDistributedTable(relationId)) - { - return true; - } + continue; } - return query_tree_walker(query, NeedsDistributedPlanningWalker, NULL, 0); - } - else - { - return expression_tree_walker(node, NeedsDistributedPlanningWalker, NULL); + if (IsDistributedTable(rangeTableEntry->relid)) + { + return true; + } } + + return false; } @@ -283,15 +299,11 @@ NeedsDistributedPlanningWalker(Node *node, void *context) * our logic. */ static void -AssignRTEIdentities(Query *queryTree) +AssignRTEIdentities(List *rangeTableList) { - List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; int rteIdentifier = 1; - /* extract range table entries for simple relations only */ - ExtractRangeTableEntryWalker((Node *) queryTree, &rangeTableList); - foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); @@ -325,15 +337,11 @@ AssignRTEIdentities(Query *queryTree) * our logic. */ static void -AdjustPartitioningForDistributedPlanning(Query *queryTree, +AdjustPartitioningForDistributedPlanning(List *rangeTableList, bool setPartitionedTablesInherited) { - List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; - /* extract range table entries for simple relations only */ - ExtractRangeTableEntryWalker((Node *) queryTree, &rangeTableList); - foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); @@ -344,7 +352,8 @@ AdjustPartitioningForDistributedPlanning(Query *queryTree, * we set each distributed partitioned table's inh flag to appropriate * value before and after dropping to the standart_planner. */ - if (IsDistributedTable(rangeTableEntry->relid) && + if (rangeTableEntry->rtekind == RTE_RELATION && + IsDistributedTable(rangeTableEntry->relid) && PartitionedTable(rangeTableEntry->relid)) { rangeTableEntry->inh = setPartitionedTablesInherited; @@ -724,7 +733,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi * distributed_planner, but on a copy of the original query, so we need * to do it again here. */ - AdjustPartitioningForDistributedPlanning(newQuery, setPartitionedTablesInherited); + AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery), + setPartitionedTablesInherited); /* * Some relations may have been removed from the query, but we can skip diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 39a475247..d874e8899 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -2026,9 +2026,30 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList) } else if (IsA(node, Query)) { - walkIsComplete = query_tree_walker((Query *) node, - ExtractRangeTableEntryWalker, - rangeTableList, QTW_EXAMINE_RTES); + Query *query = (Query *) node; + + /* + * Since we're only interested in range table entries, we only descend + * into all parts of the query when it is necessary. Otherwise, it is + * sufficient to descend into range table list since its the only part + * of the query that could contain range table entries. + */ + if (query->hasSubLinks || query->cteList || query->setOperations) + { + /* descend into all parts of the query */ + walkIsComplete = query_tree_walker((Query *) node, + ExtractRangeTableEntryWalker, + rangeTableList, + QTW_EXAMINE_RTES); + } + else + { + /* descend only into RTEs */ + walkIsComplete = range_table_walker(query->rtable, + ExtractRangeTableEntryWalker, + rangeTableList, + QTW_EXAMINE_RTES); + } } else { diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 219600906..ac0845a14 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -1505,6 +1505,7 @@ SubqueryPushdownMultiNodeTree(Query *queryTree) pushedDownQuery->rtable = copyObject(queryTree->rtable); pushedDownQuery->setOperations = copyObject(queryTree->setOperations); pushedDownQuery->querySource = queryTree->querySource; + pushedDownQuery->hasSubLinks = queryTree->hasSubLinks; subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 4c76dcb49..116c7701d 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -83,6 +83,7 @@ typedef struct RelationRowLock extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); +extern List * ExtractRangeTableEntryList(Query *query); extern bool NeedsDistributedPlanning(Query *query); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, From e8152d9b6d75975f072c0808348dffc0f81a1561 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 4 Mar 2019 14:27:34 +0100 Subject: [PATCH 4/4] Only look in top-level rtable in ExtractFirstDistributedTableId --- src/backend/distributed/planner/multi_router_planner.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 854c40309..914506faa 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1543,16 +1543,18 @@ CreateTask(TaskType taskType) * ExtractFirstDistributedTableId takes a given query, and finds the relationId * for the first distributed table in that query. If the function cannot find a * distributed table, it returns InvalidOid. + * + * We only use this function for modifications and fast path queries, which + * should have the first distributed table in the top-level rtable. */ Oid ExtractFirstDistributedTableId(Query *query) { - List *rangeTableList = NIL; + List *rangeTableList = query->rtable; ListCell *rangeTableCell = NULL; Oid distributedTableId = InvalidOid; - /* extract range table entries */ - ExtractRangeTableEntryWalker((Node *) query, &rangeTableList); + Assert(IsModifyCommand(query) || FastPathRouterQuery(query)); foreach(rangeTableCell, rangeTableList) {