mirror of https://github.com/citusdata/citus.git
Merge pull request #2633 from citusdata/trivial_parts_of_faster_all_things
Decrease CPU overhead of some of the planner functionspull/2635/head
commit
67ecbe821a
|
@ -1051,6 +1051,13 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
|
||||||
shardPlacement->nodeName = nodeName;
|
shardPlacement->nodeName = nodeName;
|
||||||
shardPlacement->nodePort = nodePort;
|
shardPlacement->nodePort = nodePort;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We cannot know the nodeId, but it is not necessary at this point either.
|
||||||
|
* This is only used to to look up the connection for a group of co-located
|
||||||
|
* placements, but append-distributed tables are never co-located.
|
||||||
|
*/
|
||||||
|
shardPlacement->nodeId = -1;
|
||||||
|
|
||||||
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement);
|
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -251,7 +251,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN);
|
strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
||||||
|
|
|
@ -133,8 +133,7 @@ static HTAB *ConnectionPlacementHash;
|
||||||
typedef struct ColocatedPlacementsHashKey
|
typedef struct ColocatedPlacementsHashKey
|
||||||
{
|
{
|
||||||
/* to identify host - database can't differ */
|
/* to identify host - database can't differ */
|
||||||
char nodeName[MAX_NODE_LENGTH];
|
int nodeId;
|
||||||
uint32 nodePort;
|
|
||||||
|
|
||||||
/* colocation group, or invalid */
|
/* colocation group, or invalid */
|
||||||
uint32 colocationGroupId;
|
uint32 colocationGroupId;
|
||||||
|
@ -689,8 +688,7 @@ FindOrCreatePlacementEntry(ShardPlacement *placement)
|
||||||
ColocatedPlacementsHashKey key;
|
ColocatedPlacementsHashKey key;
|
||||||
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
||||||
|
|
||||||
strlcpy(key.nodeName, placement->nodeName, MAX_NODE_LENGTH);
|
key.nodeId = placement->nodeId;
|
||||||
key.nodePort = placement->nodePort;
|
|
||||||
key.colocationGroupId = placement->colocationGroupId;
|
key.colocationGroupId = placement->colocationGroupId;
|
||||||
key.representativeValue = placement->representativeValue;
|
key.representativeValue = placement->representativeValue;
|
||||||
|
|
||||||
|
@ -1149,8 +1147,7 @@ ColocatedPlacementsHashHash(const void *key, Size keysize)
|
||||||
ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
|
ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
|
||||||
uint32 hash = 0;
|
uint32 hash = 0;
|
||||||
|
|
||||||
hash = string_hash(entry->nodeName, NAMEDATALEN);
|
hash = hash_uint32(entry->nodeId);
|
||||||
hash = hash_combine(hash, hash_uint32(entry->nodePort));
|
|
||||||
hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
|
hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
|
||||||
hash = hash_combine(hash, hash_uint32(entry->representativeValue));
|
hash = hash_combine(hash, hash_uint32(entry->representativeValue));
|
||||||
|
|
||||||
|
@ -1164,8 +1161,7 @@ ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
|
||||||
ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
|
ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
|
||||||
ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
|
ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
|
||||||
|
|
||||||
if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 ||
|
if (ca->nodeId != cb->nodeId ||
|
||||||
ca->nodePort != cb->nodePort ||
|
|
||||||
ca->colocationGroupId != cb->colocationGroupId ||
|
ca->colocationGroupId != cb->colocationGroupId ||
|
||||||
ca->representativeValue != cb->representativeValue)
|
ca->representativeValue != cb->representativeValue)
|
||||||
{
|
{
|
||||||
|
|
|
@ -871,7 +871,7 @@ TrackerConnectPoll(TaskTracker *taskTracker)
|
||||||
{
|
{
|
||||||
char *nodeName = taskTracker->workerName;
|
char *nodeName = taskTracker->workerName;
|
||||||
uint32 nodePort = taskTracker->workerPort;
|
uint32 nodePort = taskTracker->workerPort;
|
||||||
char *nodeDatabase = get_database_name(MyDatabaseId);
|
char *nodeDatabase = CurrentDatabaseName();
|
||||||
char *nodeUser = taskTracker->userName;
|
char *nodeUser = taskTracker->userName;
|
||||||
|
|
||||||
int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
|
int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
|
||||||
|
|
|
@ -51,7 +51,7 @@ static uint64 NextPlanId = 1;
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static bool NeedsDistributedPlanningWalker(Node *node, void *context);
|
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan,
|
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan,
|
||||||
Query *originalQuery, Query *query,
|
Query *originalQuery, Query *query,
|
||||||
ParamListInfo boundParams,
|
ParamListInfo boundParams,
|
||||||
|
@ -65,9 +65,9 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue
|
||||||
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
||||||
relationId);
|
relationId);
|
||||||
|
|
||||||
static void AssignRTEIdentities(Query *queryTree);
|
static void AssignRTEIdentities(List *rangeTableList);
|
||||||
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
|
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
|
||||||
static void AdjustPartitioningForDistributedPlanning(Query *parse,
|
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
|
||||||
bool setPartitionedTablesInherited);
|
bool setPartitionedTablesInherited);
|
||||||
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
|
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
|
||||||
DistributedPlan *distributedPlan);
|
DistributedPlan *distributedPlan);
|
||||||
|
@ -93,15 +93,23 @@ PlannedStmt *
|
||||||
distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
PlannedStmt *result = NULL;
|
PlannedStmt *result = NULL;
|
||||||
bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
|
bool needsDistributedPlanning = false;
|
||||||
Query *originalQuery = NULL;
|
Query *originalQuery = NULL;
|
||||||
PlannerRestrictionContext *plannerRestrictionContext = NULL;
|
PlannerRestrictionContext *plannerRestrictionContext = NULL;
|
||||||
bool setPartitionedTablesInherited = false;
|
bool setPartitionedTablesInherited = false;
|
||||||
|
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||||
|
|
||||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||||
{
|
{
|
||||||
|
/* this cursor flag could only be set when Citus has been loaded */
|
||||||
|
Assert(CitusHasBeenLoaded());
|
||||||
|
|
||||||
needsDistributedPlanning = true;
|
needsDistributedPlanning = true;
|
||||||
}
|
}
|
||||||
|
else if (CitusHasBeenLoaded())
|
||||||
|
{
|
||||||
|
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
|
||||||
|
}
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
|
@ -127,11 +135,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
* of the query tree. Note that we copy the query tree once we're sure it's a
|
* of the query tree. Note that we copy the query tree once we're sure it's a
|
||||||
* distributed query.
|
* distributed query.
|
||||||
*/
|
*/
|
||||||
AssignRTEIdentities(parse);
|
AssignRTEIdentities(rangeTableList);
|
||||||
originalQuery = copyObject(parse);
|
originalQuery = copyObject(parse);
|
||||||
|
|
||||||
setPartitionedTablesInherited = false;
|
setPartitionedTablesInherited = false;
|
||||||
AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited);
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
|
setPartitionedTablesInherited);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -171,7 +180,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
boundParams, plannerRestrictionContext);
|
boundParams, plannerRestrictionContext);
|
||||||
|
|
||||||
setPartitionedTablesInherited = true;
|
setPartitionedTablesInherited = true;
|
||||||
AdjustPartitioningForDistributedPlanning(parse,
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
setPartitionedTablesInherited);
|
setPartitionedTablesInherited);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,6 +214,22 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
|
||||||
|
* The function traverses the input query and returns all the range table
|
||||||
|
* entries that are in the query tree.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ExtractRangeTableEntryList(Query *query)
|
||||||
|
{
|
||||||
|
List *rangeTblList = NIL;
|
||||||
|
|
||||||
|
ExtractRangeTableEntryWalker((Node *) query, &rangeTblList);
|
||||||
|
|
||||||
|
return rangeTblList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
|
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
|
||||||
* the query contains a distributed table.
|
* the query contains a distributed table.
|
||||||
|
@ -216,61 +241,52 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
bool
|
bool
|
||||||
NeedsDistributedPlanning(Query *query)
|
NeedsDistributedPlanning(Query *query)
|
||||||
{
|
{
|
||||||
|
List *allRTEs = NIL;
|
||||||
CmdType commandType = query->commandType;
|
CmdType commandType = query->commandType;
|
||||||
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
|
|
||||||
commandType != CMD_UPDATE && commandType != CMD_DELETE)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!CitusHasBeenLoaded())
|
if (!CitusHasBeenLoaded())
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!NeedsDistributedPlanningWalker((Node *) query, NULL))
|
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
|
||||||
|
commandType != CMD_UPDATE && commandType != CMD_DELETE)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
ExtractRangeTableEntryWalker((Node *) query, &allRTEs);
|
||||||
|
|
||||||
|
return ListContainsDistributedTableRTE(allRTEs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NeedsDistributedPlanningWalker checks if the query contains any distributed
|
* ListContainsDistributedTableRTE gets a list of range table entries
|
||||||
* tables.
|
* and returns true if there is at least one distributed relation range
|
||||||
|
* table entry in the list.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
NeedsDistributedPlanningWalker(Node *node, void *context)
|
ListContainsDistributedTableRTE(List *rangeTableList)
|
||||||
{
|
{
|
||||||
if (node == NULL)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsA(node, Query))
|
|
||||||
{
|
|
||||||
Query *query = (Query *) node;
|
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
|
|
||||||
foreach(rangeTableCell, query->rtable)
|
foreach(rangeTableCell, rangeTableList)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
|
||||||
Oid relationId = rangeTableEntry->relid;
|
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||||
if (IsDistributedTable(relationId))
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsDistributedTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return query_tree_walker(query, NeedsDistributedPlanningWalker, NULL, 0);
|
return false;
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return expression_tree_walker(node, NeedsDistributedPlanningWalker, NULL);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -283,15 +299,11 @@ NeedsDistributedPlanningWalker(Node *node, void *context)
|
||||||
* our logic.
|
* our logic.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AssignRTEIdentities(Query *queryTree)
|
AssignRTEIdentities(List *rangeTableList)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
int rteIdentifier = 1;
|
int rteIdentifier = 1;
|
||||||
|
|
||||||
/* extract range table entries for simple relations only */
|
|
||||||
ExtractRangeTableEntryWalker((Node *) queryTree, &rangeTableList);
|
|
||||||
|
|
||||||
foreach(rangeTableCell, rangeTableList)
|
foreach(rangeTableCell, rangeTableList)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
@ -325,15 +337,11 @@ AssignRTEIdentities(Query *queryTree)
|
||||||
* our logic.
|
* our logic.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AdjustPartitioningForDistributedPlanning(Query *queryTree,
|
AdjustPartitioningForDistributedPlanning(List *rangeTableList,
|
||||||
bool setPartitionedTablesInherited)
|
bool setPartitionedTablesInherited)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
|
|
||||||
/* extract range table entries for simple relations only */
|
|
||||||
ExtractRangeTableEntryWalker((Node *) queryTree, &rangeTableList);
|
|
||||||
|
|
||||||
foreach(rangeTableCell, rangeTableList)
|
foreach(rangeTableCell, rangeTableList)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
@ -344,7 +352,8 @@ AdjustPartitioningForDistributedPlanning(Query *queryTree,
|
||||||
* we set each distributed partitioned table's inh flag to appropriate
|
* we set each distributed partitioned table's inh flag to appropriate
|
||||||
* value before and after dropping to the standart_planner.
|
* value before and after dropping to the standart_planner.
|
||||||
*/
|
*/
|
||||||
if (IsDistributedTable(rangeTableEntry->relid) &&
|
if (rangeTableEntry->rtekind == RTE_RELATION &&
|
||||||
|
IsDistributedTable(rangeTableEntry->relid) &&
|
||||||
PartitionedTable(rangeTableEntry->relid))
|
PartitionedTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
rangeTableEntry->inh = setPartitionedTablesInherited;
|
rangeTableEntry->inh = setPartitionedTablesInherited;
|
||||||
|
@ -724,7 +733,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* distributed_planner, but on a copy of the original query, so we need
|
* distributed_planner, but on a copy of the original query, so we need
|
||||||
* to do it again here.
|
* to do it again here.
|
||||||
*/
|
*/
|
||||||
AdjustPartitioningForDistributedPlanning(newQuery, setPartitionedTablesInherited);
|
AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery),
|
||||||
|
setPartitionedTablesInherited);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Some relations may have been removed from the query, but we can skip
|
* Some relations may have been removed from the query, but we can skip
|
||||||
|
|
|
@ -507,7 +507,7 @@ ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
|
||||||
StringInfo nodeAddress = makeStringInfo();
|
StringInfo nodeAddress = makeStringInfo();
|
||||||
char *nodeName = taskPlacement->nodeName;
|
char *nodeName = taskPlacement->nodeName;
|
||||||
uint32 nodePort = taskPlacement->nodePort;
|
uint32 nodePort = taskPlacement->nodePort;
|
||||||
char *nodeDatabase = get_database_name(MyDatabaseId);
|
char *nodeDatabase = CurrentDatabaseName();
|
||||||
ListCell *explainOutputCell = NULL;
|
ListCell *explainOutputCell = NULL;
|
||||||
int rowIndex = 0;
|
int rowIndex = 0;
|
||||||
|
|
||||||
|
|
|
@ -2026,9 +2026,30 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)
|
||||||
}
|
}
|
||||||
else if (IsA(node, Query))
|
else if (IsA(node, Query))
|
||||||
{
|
{
|
||||||
|
Query *query = (Query *) node;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Since we're only interested in range table entries, we only descend
|
||||||
|
* into all parts of the query when it is necessary. Otherwise, it is
|
||||||
|
* sufficient to descend into range table list since its the only part
|
||||||
|
* of the query that could contain range table entries.
|
||||||
|
*/
|
||||||
|
if (query->hasSubLinks || query->cteList || query->setOperations)
|
||||||
|
{
|
||||||
|
/* descend into all parts of the query */
|
||||||
walkIsComplete = query_tree_walker((Query *) node,
|
walkIsComplete = query_tree_walker((Query *) node,
|
||||||
ExtractRangeTableEntryWalker,
|
ExtractRangeTableEntryWalker,
|
||||||
rangeTableList, QTW_EXAMINE_RTES);
|
rangeTableList,
|
||||||
|
QTW_EXAMINE_RTES);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* descend only into RTEs */
|
||||||
|
walkIsComplete = range_table_walker(query->rtable,
|
||||||
|
ExtractRangeTableEntryWalker,
|
||||||
|
rangeTableList,
|
||||||
|
QTW_EXAMINE_RTES);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -2552,8 +2552,7 @@ CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterv
|
||||||
ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst(
|
ShardPlacement *secondShardPlacement = (ShardPlacement *) lfirst(
|
||||||
secondShardPlacementCell);
|
secondShardPlacementCell);
|
||||||
|
|
||||||
if (strcmp(firstShardPlacement->nodeName, secondShardPlacement->nodeName) != 0 ||
|
if (firstShardPlacement->nodeId != secondShardPlacement->nodeId)
|
||||||
firstShardPlacement->nodePort != secondShardPlacement->nodePort)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -5375,6 +5374,7 @@ AssignDualHashTaskList(List *taskList)
|
||||||
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
||||||
taskPlacement->nodeName = pstrdup(workerNode->workerName);
|
taskPlacement->nodeName = pstrdup(workerNode->workerName);
|
||||||
taskPlacement->nodePort = workerNode->workerPort;
|
taskPlacement->nodePort = workerNode->workerPort;
|
||||||
|
taskPlacement->nodeId = workerNode->nodeId;
|
||||||
|
|
||||||
taskPlacementList = lappend(taskPlacementList, taskPlacement);
|
taskPlacementList = lappend(taskPlacementList, taskPlacement);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1543,16 +1543,18 @@ CreateTask(TaskType taskType)
|
||||||
* ExtractFirstDistributedTableId takes a given query, and finds the relationId
|
* ExtractFirstDistributedTableId takes a given query, and finds the relationId
|
||||||
* for the first distributed table in that query. If the function cannot find a
|
* for the first distributed table in that query. If the function cannot find a
|
||||||
* distributed table, it returns InvalidOid.
|
* distributed table, it returns InvalidOid.
|
||||||
|
*
|
||||||
|
* We only use this function for modifications and fast path queries, which
|
||||||
|
* should have the first distributed table in the top-level rtable.
|
||||||
*/
|
*/
|
||||||
Oid
|
Oid
|
||||||
ExtractFirstDistributedTableId(Query *query)
|
ExtractFirstDistributedTableId(Query *query)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = query->rtable;
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
Oid distributedTableId = InvalidOid;
|
Oid distributedTableId = InvalidOid;
|
||||||
|
|
||||||
/* extract range table entries */
|
Assert(IsModifyCommand(query) || FastPathRouterQuery(query));
|
||||||
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
|
|
||||||
|
|
||||||
foreach(rangeTableCell, rangeTableList)
|
foreach(rangeTableCell, rangeTableList)
|
||||||
{
|
{
|
||||||
|
@ -2027,6 +2029,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
(ShardPlacement *) CitusMakeNode(ShardPlacement);
|
(ShardPlacement *) CitusMakeNode(ShardPlacement);
|
||||||
dummyPlacement->nodeName = workerNode->workerName;
|
dummyPlacement->nodeName = workerNode->workerName;
|
||||||
dummyPlacement->nodePort = workerNode->workerPort;
|
dummyPlacement->nodePort = workerNode->workerPort;
|
||||||
|
dummyPlacement->nodeId = workerNode->nodeId;
|
||||||
dummyPlacement->groupId = workerNode->groupId;
|
dummyPlacement->groupId = workerNode->groupId;
|
||||||
|
|
||||||
workerList = lappend(workerList, dummyPlacement);
|
workerList = lappend(workerList, dummyPlacement);
|
||||||
|
|
|
@ -1505,6 +1505,7 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
|
||||||
pushedDownQuery->rtable = copyObject(queryTree->rtable);
|
pushedDownQuery->rtable = copyObject(queryTree->rtable);
|
||||||
pushedDownQuery->setOperations = copyObject(queryTree->setOperations);
|
pushedDownQuery->setOperations = copyObject(queryTree->setOperations);
|
||||||
pushedDownQuery->querySource = queryTree->querySource;
|
pushedDownQuery->querySource = queryTree->querySource;
|
||||||
|
pushedDownQuery->hasSubLinks = queryTree->hasSubLinks;
|
||||||
|
|
||||||
subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery);
|
subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery);
|
||||||
|
|
||||||
|
|
|
@ -200,6 +200,7 @@ CopyNodeShardPlacement(COPYFUNC_ARGS)
|
||||||
COPY_SCALAR_FIELD(groupId);
|
COPY_SCALAR_FIELD(groupId);
|
||||||
COPY_STRING_FIELD(nodeName);
|
COPY_STRING_FIELD(nodeName);
|
||||||
COPY_SCALAR_FIELD(nodePort);
|
COPY_SCALAR_FIELD(nodePort);
|
||||||
|
COPY_SCALAR_FIELD(nodeId);
|
||||||
COPY_SCALAR_FIELD(partitionMethod);
|
COPY_SCALAR_FIELD(partitionMethod);
|
||||||
COPY_SCALAR_FIELD(colocationGroupId);
|
COPY_SCALAR_FIELD(colocationGroupId);
|
||||||
COPY_SCALAR_FIELD(representativeValue);
|
COPY_SCALAR_FIELD(representativeValue);
|
||||||
|
|
|
@ -405,6 +405,7 @@ OutShardPlacement(OUTFUNC_ARGS)
|
||||||
WRITE_INT_FIELD(groupId);
|
WRITE_INT_FIELD(groupId);
|
||||||
WRITE_STRING_FIELD(nodeName);
|
WRITE_STRING_FIELD(nodeName);
|
||||||
WRITE_UINT_FIELD(nodePort);
|
WRITE_UINT_FIELD(nodePort);
|
||||||
|
WRITE_INT_FIELD(nodeId);
|
||||||
/* so we can deal with 0 */
|
/* so we can deal with 0 */
|
||||||
WRITE_INT_FIELD(partitionMethod);
|
WRITE_INT_FIELD(partitionMethod);
|
||||||
WRITE_UINT_FIELD(colocationGroupId);
|
WRITE_UINT_FIELD(colocationGroupId);
|
||||||
|
|
|
@ -315,6 +315,7 @@ ReadShardPlacement(READFUNC_ARGS)
|
||||||
READ_INT_FIELD(groupId);
|
READ_INT_FIELD(groupId);
|
||||||
READ_STRING_FIELD(nodeName);
|
READ_STRING_FIELD(nodeName);
|
||||||
READ_UINT_FIELD(nodePort);
|
READ_UINT_FIELD(nodePort);
|
||||||
|
READ_INT_FIELD(nodeId);
|
||||||
/* so we can deal with 0 */
|
/* so we can deal with 0 */
|
||||||
READ_INT_FIELD(partitionMethod);
|
READ_INT_FIELD(partitionMethod);
|
||||||
READ_UINT_FIELD(colocationGroupId);
|
READ_UINT_FIELD(colocationGroupId);
|
||||||
|
|
|
@ -420,25 +420,12 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
|
||||||
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
|
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
|
||||||
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
|
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
|
||||||
|
|
||||||
char *leftNodeName = leftPlacement->nodeName;
|
|
||||||
char *rightNodeName = rightPlacement->nodeName;
|
|
||||||
|
|
||||||
uint32 leftNodePort = leftPlacement->nodePort;
|
|
||||||
uint32 rightNodePort = rightPlacement->nodePort;
|
|
||||||
|
|
||||||
/* first compare node names */
|
|
||||||
int nodeNameCompare = strncmp(leftNodeName, rightNodeName, WORKER_LENGTH);
|
|
||||||
if (nodeNameCompare != 0)
|
|
||||||
{
|
|
||||||
return nodeNameCompare;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if node names are same, check node ports */
|
/* if node names are same, check node ports */
|
||||||
if (leftNodePort < rightNodePort)
|
if (leftPlacement->nodeId < rightPlacement->nodeId)
|
||||||
{
|
{
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
else if (leftNodePort > rightNodePort)
|
else if (leftPlacement->nodeId > rightPlacement->nodeId)
|
||||||
{
|
{
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "citus_version.h"
|
#include "citus_version.h"
|
||||||
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "commands/trigger.h"
|
#include "commands/trigger.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
|
@ -137,6 +138,8 @@ typedef struct MetadataCacheData
|
||||||
Oid unavailableNodeRoleId;
|
Oid unavailableNodeRoleId;
|
||||||
Oid pgTableIsVisibleFuncId;
|
Oid pgTableIsVisibleFuncId;
|
||||||
Oid citusTableIsVisibleFuncId;
|
Oid citusTableIsVisibleFuncId;
|
||||||
|
bool databaseNameValid;
|
||||||
|
char databaseName[NAMEDATALEN];
|
||||||
} MetadataCacheData;
|
} MetadataCacheData;
|
||||||
|
|
||||||
|
|
||||||
|
@ -526,6 +529,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
||||||
|
|
||||||
shardPlacement->nodeName = pstrdup(workerNode->workerName);
|
shardPlacement->nodeName = pstrdup(workerNode->workerName);
|
||||||
shardPlacement->nodePort = workerNode->workerPort;
|
shardPlacement->nodePort = workerNode->workerPort;
|
||||||
|
shardPlacement->nodeId = workerNode->nodeId;
|
||||||
|
|
||||||
/* fill in remaining fields */
|
/* fill in remaining fields */
|
||||||
Assert(tableEntry->partitionMethod != 0);
|
Assert(tableEntry->partitionMethod != 0);
|
||||||
|
@ -2101,6 +2105,33 @@ CitusTableVisibleFuncId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CurrentDatabaseName gets the name of the current database and caches
|
||||||
|
* the result.
|
||||||
|
*
|
||||||
|
* Given that the database name cannot be changed when there is at least
|
||||||
|
* one session connected to it, we do not need to implement any invalidation
|
||||||
|
* mechanism.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
CurrentDatabaseName(void)
|
||||||
|
{
|
||||||
|
if (!MetadataCache.databaseNameValid)
|
||||||
|
{
|
||||||
|
char *databaseName = get_database_name(MyDatabaseId);
|
||||||
|
if (databaseName == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
strlcpy(MetadataCache.databaseName, databaseName, NAMEDATALEN);
|
||||||
|
MetadataCache.databaseNameValid = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return MetadataCache.databaseName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusExtensionOwner() returns the owner of the 'citus' extension. That user
|
* CitusExtensionOwner() returns the owner of the 'citus' extension. That user
|
||||||
* is, amongst others, used to perform actions a normal user might not be
|
* is, amongst others, used to perform actions a normal user might not be
|
||||||
|
|
|
@ -368,7 +368,7 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString)
|
||||||
{
|
{
|
||||||
WorkerTask *workerTask = NULL;
|
WorkerTask *workerTask = NULL;
|
||||||
uint32 assignmentTime = 0;
|
uint32 assignmentTime = 0;
|
||||||
char *databaseName = get_database_name(MyDatabaseId);
|
char *databaseName = CurrentDatabaseName();
|
||||||
char *userName = CurrentUserName();
|
char *userName = CurrentUserName();
|
||||||
|
|
||||||
/* increase task priority for cleanup tasks */
|
/* increase task priority for cleanup tasks */
|
||||||
|
|
|
@ -248,7 +248,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we use the same database name on the master and worker nodes */
|
/* we use the same database name on the master and worker nodes */
|
||||||
nodeDatabase = get_database_name(MyDatabaseId);
|
nodeDatabase = CurrentDatabaseName();
|
||||||
|
|
||||||
/* connect to remote node */
|
/* connect to remote node */
|
||||||
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser);
|
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser);
|
||||||
|
|
|
@ -83,6 +83,7 @@ typedef struct RelationRowLock
|
||||||
|
|
||||||
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
|
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
|
||||||
ParamListInfo boundParams);
|
ParamListInfo boundParams);
|
||||||
|
extern List * ExtractRangeTableEntryList(Query *query);
|
||||||
extern bool NeedsDistributedPlanning(Query *query);
|
extern bool NeedsDistributedPlanning(Query *query);
|
||||||
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
|
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
|
||||||
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
|
|
|
@ -99,6 +99,7 @@ typedef struct ShardPlacement
|
||||||
/* the rest of the fields aren't from pg_dist_placement */
|
/* the rest of the fields aren't from pg_dist_placement */
|
||||||
char *nodeName;
|
char *nodeName;
|
||||||
uint32 nodePort;
|
uint32 nodePort;
|
||||||
|
int nodeId;
|
||||||
char partitionMethod;
|
char partitionMethod;
|
||||||
uint32 colocationGroupId;
|
uint32 colocationGroupId;
|
||||||
uint32 representativeValue;
|
uint32 representativeValue;
|
||||||
|
|
|
@ -167,4 +167,7 @@ extern Oid BinaryCopyFormatId(void);
|
||||||
extern Oid CitusExtensionOwner(void);
|
extern Oid CitusExtensionOwner(void);
|
||||||
extern char * CitusExtensionOwnerName(void);
|
extern char * CitusExtensionOwnerName(void);
|
||||||
extern char * CurrentUserName(void);
|
extern char * CurrentUserName(void);
|
||||||
|
extern char * CurrentDatabaseName(void);
|
||||||
|
|
||||||
|
|
||||||
#endif /* METADATA_CACHE_H */
|
#endif /* METADATA_CACHE_H */
|
||||||
|
|
Loading…
Reference in New Issue