mirror of https://github.com/citusdata/citus.git
Add explicit RelationShards mapping to tasks
parent
b7d0a3237b
commit
d745d7bf70
|
@ -382,7 +382,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
|
||||||
* concurrently.
|
* concurrently.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
LockShardListResources(task->selectShardList, ExclusiveLock);
|
LockRelationShardListResources(task->relationShardList, ExclusiveLock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -462,7 +462,7 @@ AcquireExecutorMultiShardLocks(List *taskList)
|
||||||
* concurrently.
|
* concurrently.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
LockShardListResources(task->selectShardList, ExclusiveLock);
|
LockRelationShardListResources(task->relationShardList, ExclusiveLock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,238 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* deparse_shard_query.c
|
||||||
|
*
|
||||||
|
* This file contains functions for deparsing shard queries.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2014-2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "c.h"
|
||||||
|
|
||||||
|
#include "access/heapam.h"
|
||||||
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/citus_ruleutils.h"
|
||||||
|
#include "distributed/deparse_shard_query.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
#include "distributed/multi_router_planner.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
|
#include "nodes/makefuncs.h"
|
||||||
|
#include "nodes/nodeFuncs.h"
|
||||||
|
#include "nodes/nodes.h"
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
#include "storage/lock.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
|
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RebuildQueryStrings deparses the job query for each task to
|
||||||
|
* include execution-time changes such as function evaluation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RebuildQueryStrings(Query *originalQuery, List *taskList)
|
||||||
|
{
|
||||||
|
ListCell *taskCell = NULL;
|
||||||
|
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
|
||||||
|
|
||||||
|
foreach(taskCell, taskList)
|
||||||
|
{
|
||||||
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
|
StringInfo newQueryString = makeStringInfo();
|
||||||
|
Query *query = originalQuery;
|
||||||
|
|
||||||
|
if (task->insertSelectQuery)
|
||||||
|
{
|
||||||
|
/* for INSERT..SELECT, adjust shard names in SELECT part */
|
||||||
|
RangeTblEntry *copiedInsertRte = NULL;
|
||||||
|
RangeTblEntry *copiedSubqueryRte = NULL;
|
||||||
|
Query *copiedSubquery = NULL;
|
||||||
|
List *relationShardList = task->relationShardList;
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
|
||||||
|
|
||||||
|
query = copyObject(originalQuery);
|
||||||
|
|
||||||
|
copiedInsertRte = ExtractInsertRangeTableEntry(query);
|
||||||
|
copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
|
||||||
|
copiedSubquery = copiedSubqueryRte->subquery;
|
||||||
|
|
||||||
|
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
|
||||||
|
ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte);
|
||||||
|
|
||||||
|
/* setting an alias simplifies deparsing of RETURNING */
|
||||||
|
if (copiedInsertRte->alias == NULL)
|
||||||
|
{
|
||||||
|
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||||
|
copiedInsertRte->alias = alias;
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
|
||||||
|
}
|
||||||
|
|
||||||
|
deparse_shard_query(query, relationId, task->anchorShardId,
|
||||||
|
newQueryString);
|
||||||
|
|
||||||
|
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
||||||
|
task->queryString)));
|
||||||
|
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||||
|
newQueryString->data)));
|
||||||
|
|
||||||
|
task->queryString = newQueryString->data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateRelationToShardNames walks over the query tree and appends shard ids to
|
||||||
|
* relations. It uses unique identity value to establish connection between a
|
||||||
|
* shard and the range table entry. If the range table id is not given a
|
||||||
|
* identity, than the relation is not referenced from the query, no connection
|
||||||
|
* could be found between a shard and this relation. Therefore relation is replaced
|
||||||
|
* by set of NULL values so that the query would work at worker without any problems.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
UpdateRelationToShardNames(Node *node, List *relationShardList)
|
||||||
|
{
|
||||||
|
RangeTblEntry *newRte = NULL;
|
||||||
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
Oid schemaId = InvalidOid;
|
||||||
|
char *relationName = NULL;
|
||||||
|
char *schemaName = NULL;
|
||||||
|
bool replaceRteWithNullValues = false;
|
||||||
|
ListCell *relationShardCell = NULL;
|
||||||
|
RelationShard *relationShard = NULL;
|
||||||
|
|
||||||
|
if (node == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* want to look at all RTEs, even in subqueries, CTEs and such */
|
||||||
|
if (IsA(node, Query))
|
||||||
|
{
|
||||||
|
return query_tree_walker((Query *) node, UpdateRelationToShardNames,
|
||||||
|
relationShardList, QTW_EXAMINE_RTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!IsA(node, RangeTblEntry))
|
||||||
|
{
|
||||||
|
return expression_tree_walker(node, UpdateRelationToShardNames,
|
||||||
|
relationShardList);
|
||||||
|
}
|
||||||
|
|
||||||
|
newRte = (RangeTblEntry *) node;
|
||||||
|
|
||||||
|
if (newRte->rtekind != RTE_RELATION)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Search for the restrictions associated with the RTE. There better be
|
||||||
|
* some, otherwise this query wouldn't be elegible as a router query.
|
||||||
|
*
|
||||||
|
* FIXME: We should probably use a hashtable here, to do efficient
|
||||||
|
* lookup.
|
||||||
|
*/
|
||||||
|
foreach(relationShardCell, relationShardList)
|
||||||
|
{
|
||||||
|
relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||||
|
|
||||||
|
if (newRte->relid == relationShard->relationId)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
relationShard = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
replaceRteWithNullValues = relationShard == NULL ||
|
||||||
|
relationShard->shardId == INVALID_SHARD_ID;
|
||||||
|
if (replaceRteWithNullValues)
|
||||||
|
{
|
||||||
|
ConvertRteToSubqueryWithEmptyResult(newRte);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
shardId = relationShard->shardId;
|
||||||
|
relationId = relationShard->relationId;
|
||||||
|
|
||||||
|
relationName = get_rel_name(relationId);
|
||||||
|
AppendShardIdToName(&relationName, shardId);
|
||||||
|
|
||||||
|
schemaId = get_rel_namespace(relationId);
|
||||||
|
schemaName = get_namespace_name(schemaId);
|
||||||
|
|
||||||
|
ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
|
||||||
|
* subquery RTE that returns no results.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte)
|
||||||
|
{
|
||||||
|
Relation relation = heap_open(rte->relid, NoLock);
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||||
|
int columnCount = tupleDescriptor->natts;
|
||||||
|
int columnIndex = 0;
|
||||||
|
Query *subquery = NULL;
|
||||||
|
List *targetList = NIL;
|
||||||
|
FromExpr *joinTree = NULL;
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex];
|
||||||
|
TargetEntry *targetEntry = NULL;
|
||||||
|
StringInfo resname = NULL;
|
||||||
|
Const *constValue = NULL;
|
||||||
|
|
||||||
|
if (attributeForm->attisdropped)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
resname = makeStringInfo();
|
||||||
|
constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod,
|
||||||
|
attributeForm->attcollation);
|
||||||
|
|
||||||
|
appendStringInfo(resname, "%s", attributeForm->attname.data);
|
||||||
|
|
||||||
|
targetEntry = makeNode(TargetEntry);
|
||||||
|
targetEntry->expr = (Expr *) constValue;
|
||||||
|
targetEntry->resno = columnIndex;
|
||||||
|
targetEntry->resname = resname->data;
|
||||||
|
|
||||||
|
targetList = lappend(targetList, targetEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(relation, NoLock);
|
||||||
|
|
||||||
|
joinTree = makeNode(FromExpr);
|
||||||
|
joinTree->quals = makeBoolConst(false, false);
|
||||||
|
|
||||||
|
subquery = makeNode(Query);
|
||||||
|
subquery->commandType = CMD_SELECT;
|
||||||
|
subquery->querySource = QSRC_ORIGINAL;
|
||||||
|
subquery->canSetTag = true;
|
||||||
|
subquery->targetList = targetList;
|
||||||
|
subquery->jointree = joinTree;
|
||||||
|
|
||||||
|
rte->rtekind = RTE_SUBQUERY;
|
||||||
|
rte->subquery = subquery;
|
||||||
|
rte->alias = copyObject(rte->eref);
|
||||||
|
}
|
|
@ -1478,6 +1478,11 @@ GetRTEIdentity(RangeTblEntry *rte)
|
||||||
Assert(IsA(rte->values_lists, IntList));
|
Assert(IsA(rte->values_lists, IntList));
|
||||||
Assert(list_length(rte->values_lists) == 1);
|
Assert(list_length(rte->values_lists) == 1);
|
||||||
|
|
||||||
|
if (rte->values_lists == NULL)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
return linitial_int(rte->values_lists);
|
return linitial_int(rte->values_lists);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
|
@ -82,10 +83,6 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
|
||||||
RelationRestrictionContext *
|
RelationRestrictionContext *
|
||||||
restrictionContext,
|
restrictionContext,
|
||||||
uint32 taskIdIndex);
|
uint32 taskIdIndex);
|
||||||
static void AddShardIntervalRestrictionToSelect(Query *subqery,
|
|
||||||
ShardInterval *shardInterval);
|
|
||||||
static RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
|
||||||
static RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
|
||||||
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
|
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
|
||||||
bool *badCoalesce);
|
bool *badCoalesce);
|
||||||
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
|
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
|
||||||
|
@ -105,13 +102,12 @@ static Task * RouterSelectTask(Query *originalQuery,
|
||||||
static bool RouterSelectQuery(Query *originalQuery,
|
static bool RouterSelectQuery(Query *originalQuery,
|
||||||
RelationRestrictionContext *restrictionContext,
|
RelationRestrictionContext *restrictionContext,
|
||||||
List **placementList, uint64 *anchorShardId,
|
List **placementList, uint64 *anchorShardId,
|
||||||
List **selectShardList, bool replacePrunedQueryWithDummy);
|
List **relationShardList, bool replacePrunedQueryWithDummy);
|
||||||
|
static bool RelationPrunesToMultipleShards(List *relationShardList);
|
||||||
static List * TargetShardIntervalsForSelect(Query *query,
|
static List * TargetShardIntervalsForSelect(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext);
|
RelationRestrictionContext *restrictionContext);
|
||||||
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
||||||
static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
|
static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
|
||||||
static bool UpdateRelationNames(Node *node,
|
|
||||||
RelationRestrictionContext *restrictionContext);
|
|
||||||
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
|
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
|
||||||
static bool MultiRouterPlannableQuery(Query *query,
|
static bool MultiRouterPlannableQuery(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext);
|
RelationRestrictionContext *restrictionContext);
|
||||||
|
@ -295,7 +291,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
|
||||||
workerJob->jobQuery = originalQuery;
|
workerJob->jobQuery = originalQuery;
|
||||||
|
|
||||||
/* for now we do not support any function evaluation */
|
/* for now we do not support any function evaluation */
|
||||||
workerJob->requiresMasterEvaluation = false;
|
workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
||||||
|
|
||||||
/* and finally the multi plan */
|
/* and finally the multi plan */
|
||||||
multiPlan = CitusMakeNode(MultiPlan);
|
multiPlan = CitusMakeNode(MultiPlan);
|
||||||
|
@ -339,7 +335,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
Task *modifyTask = NULL;
|
Task *modifyTask = NULL;
|
||||||
List *selectPlacementList = NIL;
|
List *selectPlacementList = NIL;
|
||||||
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
||||||
List *selectShardList = NIL;
|
List *relationShardList = NIL;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
List *insertShardPlacementList = NULL;
|
List *insertShardPlacementList = NULL;
|
||||||
List *intersectedPlacementList = NULL;
|
List *intersectedPlacementList = NULL;
|
||||||
|
@ -398,7 +394,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
*/
|
*/
|
||||||
routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext,
|
routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext,
|
||||||
&selectPlacementList, &selectAnchorShardId,
|
&selectPlacementList, &selectAnchorShardId,
|
||||||
&selectShardList, replacePrunedQueryWithDummy);
|
&relationShardList, replacePrunedQueryWithDummy);
|
||||||
|
|
||||||
if (!routerPlannable)
|
if (!routerPlannable)
|
||||||
{
|
{
|
||||||
|
@ -463,7 +459,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
modifyTask->anchorShardId = shardId;
|
modifyTask->anchorShardId = shardId;
|
||||||
modifyTask->taskPlacementList = insertShardPlacementList;
|
modifyTask->taskPlacementList = insertShardPlacementList;
|
||||||
modifyTask->upsertQuery = upsertQuery;
|
modifyTask->upsertQuery = upsertQuery;
|
||||||
modifyTask->selectShardList = selectShardList;
|
modifyTask->relationShardList = relationShardList;
|
||||||
|
|
||||||
return modifyTask;
|
return modifyTask;
|
||||||
}
|
}
|
||||||
|
@ -479,7 +475,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
* The function expects and asserts that subquery's target list contains a partition
|
* The function expects and asserts that subquery's target list contains a partition
|
||||||
* column value. Thus, this function should never be called with reference tables.
|
* column value. Thus, this function should never be called with reference tables.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval)
|
AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
List *targetList = subqery->targetList;
|
List *targetList = subqery->targetList;
|
||||||
|
@ -594,7 +590,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval
|
||||||
* Note that the function expects and asserts that the input query be
|
* Note that the function expects and asserts that the input query be
|
||||||
* an INSERT...SELECT query.
|
* an INSERT...SELECT query.
|
||||||
*/
|
*/
|
||||||
static RangeTblEntry *
|
RangeTblEntry *
|
||||||
ExtractSelectRangeTableEntry(Query *query)
|
ExtractSelectRangeTableEntry(Query *query)
|
||||||
{
|
{
|
||||||
List *fromList = NULL;
|
List *fromList = NULL;
|
||||||
|
@ -617,7 +613,7 @@ ExtractSelectRangeTableEntry(Query *query)
|
||||||
* Note that the function expects and asserts that the input query be
|
* Note that the function expects and asserts that the input query be
|
||||||
* an INSERT...SELECT query.
|
* an INSERT...SELECT query.
|
||||||
*/
|
*/
|
||||||
static RangeTblEntry *
|
RangeTblEntry *
|
||||||
ExtractInsertRangeTableEntry(Query *query)
|
ExtractInsertRangeTableEntry(Query *query)
|
||||||
{
|
{
|
||||||
int resultRelation = query->resultRelation;
|
int resultRelation = query->resultRelation;
|
||||||
|
@ -650,13 +646,13 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
|
|
||||||
subquery = subqueryRte->subquery;
|
subquery = subqueryRte->subquery;
|
||||||
|
|
||||||
if (contain_mutable_functions((Node *) queryTree))
|
if (contain_volatile_functions((Node *) queryTree))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot perform distributed planning for the given "
|
errmsg("cannot perform distributed planning for the given "
|
||||||
"modification"),
|
"modification"),
|
||||||
errdetail(
|
errdetail(
|
||||||
"Stable and volatile functions are not allowed in INSERT ... "
|
"Volatile functions are not allowed in INSERT ... "
|
||||||
"SELECT queries")));
|
"SELECT queries")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1917,14 +1913,14 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
|
||||||
StringInfo queryString = makeStringInfo();
|
StringInfo queryString = makeStringInfo();
|
||||||
bool upsertQuery = false;
|
bool upsertQuery = false;
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
List *selectShardList = NIL;
|
List *relationShardList = NIL;
|
||||||
bool replacePrunedQueryWithDummy = false;
|
bool replacePrunedQueryWithDummy = false;
|
||||||
|
|
||||||
/* router planner should create task even if it deosn't hit a shard at all */
|
/* router planner should create task even if it deosn't hit a shard at all */
|
||||||
replacePrunedQueryWithDummy = true;
|
replacePrunedQueryWithDummy = true;
|
||||||
|
|
||||||
queryRoutable = RouterSelectQuery(originalQuery, restrictionContext,
|
queryRoutable = RouterSelectQuery(originalQuery, restrictionContext,
|
||||||
placementList, &shardId, &selectShardList,
|
placementList, &shardId, &relationShardList,
|
||||||
replacePrunedQueryWithDummy);
|
replacePrunedQueryWithDummy);
|
||||||
|
|
||||||
|
|
||||||
|
@ -1943,6 +1939,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->dependedTaskList = NIL;
|
task->dependedTaskList = NIL;
|
||||||
task->upsertQuery = upsertQuery;
|
task->upsertQuery = upsertQuery;
|
||||||
|
task->relationShardList = relationShardList;
|
||||||
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
@ -1954,12 +1951,14 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
|
||||||
*
|
*
|
||||||
* On return true, all RTEs have been updated to point to the relevant shards in
|
* On return true, all RTEs have been updated to point to the relevant shards in
|
||||||
* the originalQuery. Also, placementList is filled with the list of worker nodes
|
* the originalQuery. Also, placementList is filled with the list of worker nodes
|
||||||
* that has all the required shard placements for the query execution. Finally,
|
* that has all the required shard placements for the query execution.
|
||||||
* anchorShardId is set to the first pruned shardId of the given query.
|
* anchorShardId is set to the first pruned shardId of the given query. Finally,
|
||||||
|
* relationShardList is filled with the list of relation-to-shard mappings for
|
||||||
|
* the query.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
||||||
List **placementList, uint64 *anchorShardId, List **selectShardList,
|
List **placementList, uint64 *anchorShardId, List **relationShardList,
|
||||||
bool replacePrunedQueryWithDummy)
|
bool replacePrunedQueryWithDummy)
|
||||||
{
|
{
|
||||||
List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery,
|
List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery,
|
||||||
|
@ -1982,7 +1981,9 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
|
||||||
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
||||||
{
|
{
|
||||||
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
RelationShard *relationShard = NULL;
|
||||||
|
|
||||||
/* no shard is present or all shards are pruned out case will be handled later */
|
/* no shard is present or all shards are pruned out case will be handled later */
|
||||||
if (prunedShardList == NIL)
|
if (prunedShardList == NIL)
|
||||||
|
@ -2003,7 +2004,21 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
|
||||||
shardId = shardInterval->shardId;
|
shardId = shardInterval->shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
*selectShardList = lappend(*selectShardList, shardInterval);
|
/* add relation to shard mapping */
|
||||||
|
relationShard = CitusMakeNode(RelationShard);
|
||||||
|
relationShard->relationId = shardInterval->relationId;
|
||||||
|
relationShard->shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
*relationShardList = lappend(*relationShardList, relationShard);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We bail out if there are RTEs that prune multiple shards above, but
|
||||||
|
* there can also be multiple RTEs that reference the same relation.
|
||||||
|
*/
|
||||||
|
if (RelationPrunesToMultipleShards(*relationShardList))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2049,7 +2064,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateRelationNames((Node *) originalQuery, restrictionContext);
|
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
|
||||||
|
|
||||||
*placementList = workerList;
|
*placementList = workerList;
|
||||||
*anchorShardId = shardId;
|
*anchorShardId = shardId;
|
||||||
|
@ -2137,6 +2152,37 @@ TargetShardIntervalsForSelect(Query *query,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RelationPrunesToMultipleShards returns true if the given list of
|
||||||
|
* relation-to-shard mappings contains at least two mappings with
|
||||||
|
* the same relation, but different shards.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RelationPrunesToMultipleShards(List *relationShardList)
|
||||||
|
{
|
||||||
|
ListCell *relationShardCell = NULL;
|
||||||
|
RelationShard *previousRelationShard = NULL;
|
||||||
|
|
||||||
|
relationShardList = SortList(relationShardList, CompareRelationShards);
|
||||||
|
|
||||||
|
foreach(relationShardCell, relationShardList)
|
||||||
|
{
|
||||||
|
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||||
|
|
||||||
|
if (previousRelationShard != NULL &&
|
||||||
|
relationShard->relationId == previousRelationShard->relationId &&
|
||||||
|
relationShard->shardId != previousRelationShard->shardId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
previousRelationShard = relationShard;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkersContainingAllShards returns list of shard placements that contain all
|
* WorkersContainingAllShards returns list of shard placements that contain all
|
||||||
* shard intervals provided to the function. It returns NIL if no placement exists.
|
* shard intervals provided to the function. It returns NIL if no placement exists.
|
||||||
|
@ -2233,164 +2279,6 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
|
|
||||||
* subquery RTE that returns no results.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte)
|
|
||||||
{
|
|
||||||
Relation relation = heap_open(rte->relid, NoLock);
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
|
||||||
int columnCount = tupleDescriptor->natts;
|
|
||||||
int columnIndex = 0;
|
|
||||||
Query *subquery = NULL;
|
|
||||||
List *targetList = NIL;
|
|
||||||
FromExpr *joinTree = NULL;
|
|
||||||
|
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
|
||||||
{
|
|
||||||
FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex];
|
|
||||||
TargetEntry *targetEntry = NULL;
|
|
||||||
StringInfo resname = NULL;
|
|
||||||
Const *constValue = NULL;
|
|
||||||
|
|
||||||
if (attributeForm->attisdropped)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
resname = makeStringInfo();
|
|
||||||
constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod,
|
|
||||||
attributeForm->attcollation);
|
|
||||||
|
|
||||||
appendStringInfo(resname, "%s", attributeForm->attname.data);
|
|
||||||
|
|
||||||
targetEntry = makeNode(TargetEntry);
|
|
||||||
targetEntry->expr = (Expr *) constValue;
|
|
||||||
targetEntry->resno = columnIndex;
|
|
||||||
targetEntry->resname = resname->data;
|
|
||||||
|
|
||||||
targetList = lappend(targetList, targetEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
heap_close(relation, NoLock);
|
|
||||||
|
|
||||||
joinTree = makeNode(FromExpr);
|
|
||||||
joinTree->quals = makeBoolConst(false, false);
|
|
||||||
|
|
||||||
subquery = makeNode(Query);
|
|
||||||
subquery->commandType = CMD_SELECT;
|
|
||||||
subquery->querySource = QSRC_ORIGINAL;
|
|
||||||
subquery->canSetTag = true;
|
|
||||||
subquery->targetList = targetList;
|
|
||||||
subquery->jointree = joinTree;
|
|
||||||
|
|
||||||
rte->rtekind = RTE_SUBQUERY;
|
|
||||||
rte->subquery = subquery;
|
|
||||||
rte->alias = copyObject(rte->eref);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UpdateRelationNames walks over the query tree and appends shard ids to
|
|
||||||
* relations. It uses unique identity value to establish connection between a
|
|
||||||
* shard and the range table entry. If the range table id is not given a
|
|
||||||
* identity, than the relation is not referenced from the query, no connection
|
|
||||||
* could be found between a shard and this relation. Therefore relation is replaced
|
|
||||||
* by set of NULL values so that the query would work at worker without any problems.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
UpdateRelationNames(Node *node, RelationRestrictionContext *restrictionContext)
|
|
||||||
{
|
|
||||||
RangeTblEntry *newRte = NULL;
|
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
|
||||||
Oid relationId = InvalidOid;
|
|
||||||
Oid schemaId = InvalidOid;
|
|
||||||
char *relationName = NULL;
|
|
||||||
char *schemaName = NULL;
|
|
||||||
ListCell *relationRestrictionCell = NULL;
|
|
||||||
RelationRestriction *relationRestriction = NULL;
|
|
||||||
List *shardIntervalList = NIL;
|
|
||||||
ShardInterval *shardInterval = NULL;
|
|
||||||
bool replaceRteWithNullValues = false;
|
|
||||||
|
|
||||||
if (node == NULL)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* want to look at all RTEs, even in subqueries, CTEs and such */
|
|
||||||
if (IsA(node, Query))
|
|
||||||
{
|
|
||||||
return query_tree_walker((Query *) node, UpdateRelationNames, restrictionContext,
|
|
||||||
QTW_EXAMINE_RTES);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!IsA(node, RangeTblEntry))
|
|
||||||
{
|
|
||||||
return expression_tree_walker(node, UpdateRelationNames, restrictionContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
newRte = (RangeTblEntry *) node;
|
|
||||||
|
|
||||||
if (newRte->rtekind != RTE_RELATION)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Search for the restrictions associated with the RTE. There better be
|
|
||||||
* some, otherwise this query wouldn't be elegible as a router query.
|
|
||||||
*
|
|
||||||
* FIXME: We should probably use a hashtable here, to do efficient
|
|
||||||
* lookup.
|
|
||||||
*/
|
|
||||||
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
|
|
||||||
{
|
|
||||||
relationRestriction =
|
|
||||||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
|
||||||
|
|
||||||
if (GetRTEIdentity(relationRestriction->rte) == GetRTEIdentity(newRte))
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
relationRestriction = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
replaceRteWithNullValues = (relationRestriction == NULL) ||
|
|
||||||
relationRestriction->prunedShardIntervalList == NIL;
|
|
||||||
|
|
||||||
if (replaceRteWithNullValues)
|
|
||||||
{
|
|
||||||
ConvertRteToSubqueryWithEmptyResult(newRte);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert(relationRestriction != NULL);
|
|
||||||
|
|
||||||
shardIntervalList = relationRestriction->prunedShardIntervalList;
|
|
||||||
|
|
||||||
Assert(list_length(shardIntervalList) == 1);
|
|
||||||
shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
|
||||||
|
|
||||||
shardId = shardInterval->shardId;
|
|
||||||
relationId = shardInterval->relationId;
|
|
||||||
relationName = get_rel_name(relationId);
|
|
||||||
AppendShardIdToName(&relationName, shardId);
|
|
||||||
|
|
||||||
schemaId = get_rel_namespace(relationId);
|
|
||||||
schemaName = get_namespace_name(schemaId);
|
|
||||||
|
|
||||||
ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterQueryJob creates a Job for the specified query to execute the
|
* RouterQueryJob creates a Job for the specified query to execute the
|
||||||
* provided single shard select task.
|
* provided single shard select task.
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
|
|
||||||
static const char *CitusNodeTagNamesD[] = {
|
static const char *CitusNodeTagNamesD[] = {
|
||||||
"MultiNode",
|
"MultiNode",
|
||||||
|
@ -31,7 +32,8 @@ static const char *CitusNodeTagNamesD[] = {
|
||||||
"MultiPlan",
|
"MultiPlan",
|
||||||
"Task",
|
"Task",
|
||||||
"ShardInterval",
|
"ShardInterval",
|
||||||
"ShardPlacement"
|
"ShardPlacement",
|
||||||
|
"RelationShard"
|
||||||
};
|
};
|
||||||
|
|
||||||
const char **CitusNodeTagNames = CitusNodeTagNamesD;
|
const char **CitusNodeTagNames = CitusNodeTagNamesD;
|
||||||
|
@ -379,6 +381,7 @@ const ExtensibleNodeMethods nodeMethods[] =
|
||||||
DEFINE_NODE_METHODS(ShardInterval),
|
DEFINE_NODE_METHODS(ShardInterval),
|
||||||
DEFINE_NODE_METHODS(MapMergeJob),
|
DEFINE_NODE_METHODS(MapMergeJob),
|
||||||
DEFINE_NODE_METHODS(ShardPlacement),
|
DEFINE_NODE_METHODS(ShardPlacement),
|
||||||
|
DEFINE_NODE_METHODS(RelationShard),
|
||||||
DEFINE_NODE_METHODS(Task),
|
DEFINE_NODE_METHODS(Task),
|
||||||
|
|
||||||
/* nodes with only output support */
|
/* nodes with only output support */
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
|
@ -475,6 +476,17 @@ OutShardPlacement(OUTFUNC_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
OutRelationShard(OUTFUNC_ARGS)
|
||||||
|
{
|
||||||
|
WRITE_LOCALS(RelationShard);
|
||||||
|
WRITE_NODE_TYPE("RELATIONSHARD");
|
||||||
|
|
||||||
|
WRITE_OID_FIELD(relationId);
|
||||||
|
WRITE_UINT64_FIELD(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
OutTask(OUTFUNC_ARGS)
|
OutTask(OUTFUNC_ARGS)
|
||||||
{
|
{
|
||||||
|
@ -495,7 +507,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_NODE_FIELD(taskExecution);
|
WRITE_NODE_FIELD(taskExecution);
|
||||||
WRITE_BOOL_FIELD(upsertQuery);
|
WRITE_BOOL_FIELD(upsertQuery);
|
||||||
WRITE_BOOL_FIELD(insertSelectQuery);
|
WRITE_BOOL_FIELD(insertSelectQuery);
|
||||||
WRITE_NODE_FIELD(selectShardList);
|
WRITE_NODE_FIELD(relationShardList);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if (PG_VERSION_NUM < 90600)
|
#if (PG_VERSION_NUM < 90600)
|
||||||
|
@ -612,6 +624,12 @@ outNode(StringInfo str, const void *obj)
|
||||||
appendStringInfoChar(str, '}');
|
appendStringInfoChar(str, '}');
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case T_RelationShard:
|
||||||
|
appendStringInfoChar(str, '{');
|
||||||
|
OutRelationShard(str, obj);
|
||||||
|
appendStringInfoChar(str, '}');
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
/* fall back into postgres' normal nodeToString machinery */
|
/* fall back into postgres' normal nodeToString machinery */
|
||||||
appendStringInfoString(str, nodeToString(obj));
|
appendStringInfoString(str, nodeToString(obj));
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
|
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
#include "nodes/readfuncs.h"
|
#include "nodes/readfuncs.h"
|
||||||
|
|
||||||
|
@ -272,6 +273,18 @@ ReadShardPlacement(READFUNC_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
READFUNC_RET
|
||||||
|
ReadRelationShard(READFUNC_ARGS)
|
||||||
|
{
|
||||||
|
READ_LOCALS(RelationShard);
|
||||||
|
|
||||||
|
READ_OID_FIELD(relationId);
|
||||||
|
READ_UINT64_FIELD(shardId);
|
||||||
|
|
||||||
|
READ_DONE();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
READFUNC_RET
|
READFUNC_RET
|
||||||
ReadTask(READFUNC_ARGS)
|
ReadTask(READFUNC_ARGS)
|
||||||
{
|
{
|
||||||
|
@ -291,7 +304,7 @@ ReadTask(READFUNC_ARGS)
|
||||||
READ_NODE_FIELD(taskExecution);
|
READ_NODE_FIELD(taskExecution);
|
||||||
READ_BOOL_FIELD(upsertQuery);
|
READ_BOOL_FIELD(upsertQuery);
|
||||||
READ_BOOL_FIELD(insertSelectQuery);
|
READ_BOOL_FIELD(insertSelectQuery);
|
||||||
READ_NODE_FIELD(selectShardList);
|
READ_NODE_FIELD(relationShardList);
|
||||||
|
|
||||||
READ_DONE();
|
READ_DONE();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1515,6 +1515,8 @@ CitusParseNodeString(void)
|
||||||
return_value = ReadMapMergeJob();
|
return_value = ReadMapMergeJob();
|
||||||
else if (MATCH("SHARDPLACEMENT", 14))
|
else if (MATCH("SHARDPLACEMENT", 14))
|
||||||
return_value = ReadShardPlacement();
|
return_value = ReadShardPlacement();
|
||||||
|
else if (MATCH("RELATIONSHARD", 13))
|
||||||
|
return_value = ReadRelationShard();
|
||||||
else if (MATCH("TASK", 4))
|
else if (MATCH("TASK", 4))
|
||||||
return_value = ReadTask();
|
return_value = ReadTask();
|
||||||
/* XXX: END Citus Nodes */
|
/* XXX: END Citus Nodes */
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
@ -303,6 +304,31 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockRelationShards takes locks on all shards in a list of RelationShards
|
||||||
|
* to prevent concurrent DML statements on those shards.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
ListCell *relationShardCell = NULL;
|
||||||
|
|
||||||
|
/* lock shards in a consistent order to prevent deadlock */
|
||||||
|
relationShardList = SortList(relationShardList, CompareRelationShards);
|
||||||
|
|
||||||
|
foreach(relationShardCell, relationShardList)
|
||||||
|
{
|
||||||
|
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||||
|
uint64 shardId = relationShard->shardId;
|
||||||
|
|
||||||
|
if (shardId != INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
LockShardResource(shardId, lockMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LockMetadataSnapshot acquires a lock needed to serialize changes to pg_dist_node
|
* LockMetadataSnapshot acquires a lock needed to serialize changes to pg_dist_node
|
||||||
* and all other metadata changes. Operations that modify pg_dist_node should acquire
|
* and all other metadata changes. Operations that modify pg_dist_node should acquire
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
#include "catalog/pg_collation.h"
|
#include "catalog/pg_collation.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
@ -132,6 +133,43 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CompareRelationShards is a comparison function for sorting relation
|
||||||
|
* to shard mappings by their relation ID and then shard ID.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
CompareRelationShards(const void *leftElement, const void *rightElement)
|
||||||
|
{
|
||||||
|
RelationShard *leftRelationShard = *((RelationShard **) leftElement);
|
||||||
|
RelationShard *rightRelationShard = *((RelationShard **) rightElement);
|
||||||
|
Oid leftRelationId = leftRelationShard->relationId;
|
||||||
|
Oid rightRelationId = rightRelationShard->relationId;
|
||||||
|
int64 leftShardId = leftRelationShard->shardId;
|
||||||
|
int64 rightShardId = rightRelationShard->shardId;
|
||||||
|
|
||||||
|
if (leftRelationId > rightRelationId)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else if (leftRelationId < rightRelationId)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if (leftShardId > rightShardId)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else if (leftShardId < rightShardId)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindShardIntervalIndex finds index of given shard in sorted shard interval array.
|
* FindShardIntervalIndex finds index of given shard in sorted shard interval array.
|
||||||
*
|
*
|
||||||
|
|
|
@ -66,6 +66,7 @@ extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
|
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
|
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
|
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
|
||||||
|
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadTask(READFUNC_ARGS);
|
extern READFUNC_RET ReadTask(READFUNC_ARGS);
|
||||||
|
|
||||||
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
|
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
|
||||||
|
@ -75,6 +76,7 @@ extern void OutMultiPlan(OUTFUNC_ARGS);
|
||||||
extern void OutShardInterval(OUTFUNC_ARGS);
|
extern void OutShardInterval(OUTFUNC_ARGS);
|
||||||
extern void OutMapMergeJob(OUTFUNC_ARGS);
|
extern void OutMapMergeJob(OUTFUNC_ARGS);
|
||||||
extern void OutShardPlacement(OUTFUNC_ARGS);
|
extern void OutShardPlacement(OUTFUNC_ARGS);
|
||||||
|
extern void OutRelationShard(OUTFUNC_ARGS);
|
||||||
extern void OutTask(OUTFUNC_ARGS);
|
extern void OutTask(OUTFUNC_ARGS);
|
||||||
|
|
||||||
extern void OutMultiNode(OUTFUNC_ARGS);
|
extern void OutMultiNode(OUTFUNC_ARGS);
|
||||||
|
|
|
@ -55,7 +55,8 @@ typedef enum CitusNodeTag
|
||||||
T_MultiPlan,
|
T_MultiPlan,
|
||||||
T_Task,
|
T_Task,
|
||||||
T_ShardInterval,
|
T_ShardInterval,
|
||||||
T_ShardPlacement
|
T_ShardPlacement,
|
||||||
|
T_RelationShard
|
||||||
} CitusNodeTag;
|
} CitusNodeTag;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* deparse_shard_query.h
|
||||||
|
*
|
||||||
|
* Declarations for public functions and types related to deparsing shard
|
||||||
|
* queries.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2014-2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef DEPARSE_SHARD_QUERY_H
|
||||||
|
#define DEPARSE_SHARD_QUERY_H
|
||||||
|
|
||||||
|
#include "c.h"
|
||||||
|
|
||||||
|
#include "nodes/nodes.h"
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
|
extern void RebuildQueryStrings(Query *originalQuery, List *taskList);
|
||||||
|
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* DEPARSE_SHARD_QUERY_H */
|
|
@ -171,7 +171,7 @@ typedef struct Task
|
||||||
bool upsertQuery; /* only applies to modify tasks */
|
bool upsertQuery; /* only applies to modify tasks */
|
||||||
|
|
||||||
bool insertSelectQuery;
|
bool insertSelectQuery;
|
||||||
List *selectShardList; /* only applies INSERT/SELECT tasks */
|
List *relationShardList; /* only applies INSERT/SELECT tasks */
|
||||||
} Task;
|
} Task;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
#include "nodes/relation.h"
|
#include "nodes/relation.h"
|
||||||
|
|
||||||
|
#include "distributed/citus_nodes.h"
|
||||||
|
|
||||||
|
|
||||||
/* values used by jobs and tasks which do not require identifiers */
|
/* values used by jobs and tasks which do not require identifiers */
|
||||||
#define INVALID_JOB_ID 0
|
#define INVALID_JOB_ID 0
|
||||||
|
@ -38,6 +40,13 @@ typedef struct RelationRestriction
|
||||||
List *prunedShardIntervalList;
|
List *prunedShardIntervalList;
|
||||||
} RelationRestriction;
|
} RelationRestriction;
|
||||||
|
|
||||||
|
typedef struct RelationShard
|
||||||
|
{
|
||||||
|
CitusNode type;
|
||||||
|
Oid relationId;
|
||||||
|
uint64 shardId;
|
||||||
|
} RelationShard;
|
||||||
|
|
||||||
|
|
||||||
extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
|
extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
|
||||||
ParamListInfo boundParams);
|
ParamListInfo boundParams);
|
||||||
|
|
|
@ -37,5 +37,10 @@ extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
||||||
RangeTblEntry *subqueryRte);
|
RangeTblEntry *subqueryRte);
|
||||||
extern bool InsertSelectQuery(Query *query);
|
extern bool InsertSelectQuery(Query *query);
|
||||||
extern Oid ExtractFirstDistributedTableId(Query *query);
|
extern Oid ExtractFirstDistributedTableId(Query *query);
|
||||||
|
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||||
|
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||||
|
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||||
|
ShardInterval *shardInterval);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||||
|
|
|
@ -79,6 +79,7 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
|
||||||
/* Lock multiple shards for safe modification */
|
/* Lock multiple shards for safe modification */
|
||||||
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
|
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
|
||||||
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
|
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
|
||||||
|
extern void LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode);
|
||||||
|
|
||||||
extern void LockMetadataSnapshot(LOCKMODE lockMode);
|
extern void LockMetadataSnapshot(LOCKMODE lockMode);
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,8 @@ extern ShardInterval * LowestShardIntervalById(List *shardIntervalList);
|
||||||
extern int CompareShardIntervals(const void *leftElement, const void *rightElement,
|
extern int CompareShardIntervals(const void *leftElement, const void *rightElement,
|
||||||
FmgrInfo *typeCompareFunction);
|
FmgrInfo *typeCompareFunction);
|
||||||
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
||||||
|
extern int CompareRelationShards(const void *leftElement,
|
||||||
|
const void *rightElement);
|
||||||
extern int FindShardIntervalIndex(ShardInterval *shardInterval);
|
extern int FindShardIntervalIndex(ShardInterval *shardInterval);
|
||||||
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
||||||
ShardInterval **shardIntervalCache,
|
ShardInterval **shardIntervalCache,
|
||||||
|
|
|
@ -334,7 +334,6 @@ id_title AS (SELECT id, title from articles_hash WHERE author_id = 2)
|
||||||
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
|
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
|
||||||
DEBUG: predicate pruning for shardId 840001
|
DEBUG: predicate pruning for shardId 840001
|
||||||
DEBUG: predicate pruning for shardId 840000
|
DEBUG: predicate pruning for shardId 840000
|
||||||
DEBUG: Found no worker with all shard placements
|
|
||||||
ERROR: cannot perform distributed planning on this query
|
ERROR: cannot perform distributed planning on this query
|
||||||
DETAIL: Complex table expressions are currently unsupported
|
DETAIL: Complex table expressions are currently unsupported
|
||||||
-- recursive CTEs are supported when filtered on partition column
|
-- recursive CTEs are supported when filtered on partition column
|
||||||
|
@ -420,8 +419,7 @@ DEBUG: predicate pruning for shardId 840006
|
||||||
ERROR: cannot perform distributed planning on this query
|
ERROR: cannot perform distributed planning on this query
|
||||||
DETAIL: Complex table expressions are currently unsupported
|
DETAIL: Complex table expressions are currently unsupported
|
||||||
-- logically wrong query, query involves different shards
|
-- logically wrong query, query involves different shards
|
||||||
-- from the same table, but still router plannable due to
|
-- from the same table
|
||||||
-- shard being placed on the same worker.
|
|
||||||
WITH RECURSIVE hierarchy as (
|
WITH RECURSIVE hierarchy as (
|
||||||
SELECT *, 1 AS level
|
SELECT *, 1 AS level
|
||||||
FROM company_employees
|
FROM company_employees
|
||||||
|
@ -439,13 +437,8 @@ DEBUG: predicate pruning for shardId 840006
|
||||||
DEBUG: predicate pruning for shardId 840003
|
DEBUG: predicate pruning for shardId 840003
|
||||||
DEBUG: predicate pruning for shardId 840004
|
DEBUG: predicate pruning for shardId 840004
|
||||||
DEBUG: predicate pruning for shardId 840005
|
DEBUG: predicate pruning for shardId 840005
|
||||||
DEBUG: Creating router plan
|
ERROR: cannot perform distributed planning on this query
|
||||||
DEBUG: Plan is router executable
|
DETAIL: Complex table expressions are currently unsupported
|
||||||
company_id | employee_id | manager_id | level
|
|
||||||
------------+-------------+------------+-------
|
|
||||||
3 | 1 | 0 | 1
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- grouping sets are supported on single shard
|
-- grouping sets are supported on single shard
|
||||||
SELECT
|
SELECT
|
||||||
id, substring(title, 2, 1) AS subtitle, count(*)
|
id, substring(title, 2, 1) AS subtitle, count(*)
|
||||||
|
|
|
@ -213,8 +213,7 @@ WITH RECURSIVE hierarchy as (
|
||||||
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
||||||
|
|
||||||
-- logically wrong query, query involves different shards
|
-- logically wrong query, query involves different shards
|
||||||
-- from the same table, but still router plannable due to
|
-- from the same table
|
||||||
-- shard being placed on the same worker.
|
|
||||||
WITH RECURSIVE hierarchy as (
|
WITH RECURSIVE hierarchy as (
|
||||||
SELECT *, 1 AS level
|
SELECT *, 1 AS level
|
||||||
FROM company_employees
|
FROM company_employees
|
||||||
|
|
Loading…
Reference in New Issue