Add an extract_equality_filters_from_query diagnostic function

pull/5740/merge^2
Marco Slot 2022-02-23 11:19:02 +01:00
parent 9a8f11a086
commit cf2239f9cd
11 changed files with 535 additions and 33 deletions

View File

@ -89,7 +89,6 @@ static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);
static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
bool setPartitionedTablesInherited);
@ -114,9 +113,7 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
@ -374,7 +371,7 @@ ListContainsDistributedTableRTE(List *rangeTableList,
* Returns the next id. This can be used to call on a rangeTableList that may've
* been partially assigned. Should be set to 1 initially.
*/
static int
int
AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
{
ListCell *rangeTableCell = NULL;
@ -2156,7 +2153,7 @@ CopyPlanParamList(List *originalPlanParamList)
* plannerRestrictionContextList. Finally, the planner restriction context is
* inserted to the beginning of the plannerRestrictionContextList and it is returned.
*/
static PlannerRestrictionContext *
PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void)
{
PlannerRestrictionContext *plannerRestrictionContext =
@ -2237,7 +2234,7 @@ CurrentPlannerRestrictionContext(void)
* PopPlannerRestrictionContext removes the most recently added restriction contexts from
* the planner restriction context list. The function assumes the list is not empty.
*/
static void
void
PopPlannerRestrictionContext(void)
{
plannerRestrictionContextList = list_delete_first(plannerRestrictionContextList);

View File

@ -1149,38 +1149,14 @@ GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext *
{
RestrictInfo *rinfo = (RestrictInfo *) lfirst(restrictionInfoList);
Expr *restrictionClause = rinfo->clause;
Var *leftVar = NULL;
Var *rightVar = NULL;
if (!IsA(restrictionClause, OpExpr))
if (!IsColumnEquiJoinClause(restrictionClause, &leftVar, &rightVar))
{
continue;
}
OpExpr *restrictionOpExpr = (OpExpr *) restrictionClause;
if (list_length(restrictionOpExpr->args) != 2)
{
continue;
}
if (!OperatorImplementsEquality(restrictionOpExpr->opno))
{
continue;
}
Node *leftNode = linitial(restrictionOpExpr->args);
Node *rightNode = lsecond(restrictionOpExpr->args);
/* we also don't want implicit coercions */
Expr *strippedLeftExpr = (Expr *) strip_implicit_coercions((Node *) leftNode);
Expr *strippedRightExpr = (Expr *) strip_implicit_coercions(
(Node *) rightNode);
if (!(IsA(strippedLeftExpr, Var) && IsA(strippedRightExpr, Var)))
{
continue;
}
Var *leftVar = (Var *) strippedLeftExpr;
Var *rightVar = (Var *) strippedRightExpr;
AttributeEquivalenceClass *attributeEquivalence = palloc0(
sizeof(AttributeEquivalenceClass));
attributeEquivalence->equivalenceId = AttributeEquivalenceId++;
@ -1201,6 +1177,47 @@ GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext *
}
/*
* IsColumEquiJoinClause returns whether a given clause is of the form
* <left var> = <right var> and sets leftVar and rightVar accordingly.
*/
bool
IsColumnEquiJoinClause(Expr *restrictionClause, Var **leftVar, Var **rightVar)
{
if (!IsA(restrictionClause, OpExpr))
{
return false;
}
OpExpr *restrictionOpExpr = (OpExpr *) restrictionClause;
if (list_length(restrictionOpExpr->args) != 2)
{
return false;
}
if (!OperatorImplementsEquality(restrictionOpExpr->opno))
{
return false;
}
Node *leftNode = linitial(restrictionOpExpr->args);
Node *rightNode = lsecond(restrictionOpExpr->args);
/* we also don't want implicit coercions */
Expr *strippedLeftExpr = (Expr *) strip_implicit_coercions((Node *) leftNode);
Expr *strippedRightExpr = (Expr *) strip_implicit_coercions((Node *) rightNode);
if (!(IsA(strippedLeftExpr, Var) && IsA(strippedRightExpr, Var)))
{
return false;
}
*leftVar = (Var *) strippedLeftExpr;
*rightVar = (Var *) strippedRightExpr;
return true;
}
/*
* AddToAttributeEquivalenceClass is a key function for building the attribute
* equivalences. The function gets a plannerInfo, var and attribute equivalence

View File

@ -3,6 +3,8 @@
-- bump version to 11.0-1
#include "udfs/citus_disable_node/11.0-1.sql"
#include "udfs/create_distributed_function/11.0-1.sql"
#include "udfs/extract_equality_filters_from_query/11.0-1.sql"
#include "udfs/citus_internal_null_wrapper/11.0-1.sql"
#include "udfs/citus_check_connection_to_node/11.0-1.sql"
#include "udfs/citus_check_cluster_node_health/11.0-1.sql"

View File

@ -1,6 +1,9 @@
-- citus--11.0-1--10.2-4
DROP FUNCTION pg_catalog.create_distributed_function(regprocedure, text, text, bool);
DROP FUNCTION pg_catalog.extract_equality_filters_from_query(text);
DROP FUNCTION pg_catalog.citus_internal_null_wrapper(anyelement);
CREATE FUNCTION pg_catalog.master_apply_delete_command(text)
RETURNS integer
LANGUAGE C STRICT

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_null_wrapper(input anyelement)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN input;
END;
$function$;
COMMENT ON FUNCTION pg_catalog.citus_internal_null_wrapper(input anyelement)
IS 'returns the input as the same type';

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_null_wrapper(input anyelement)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN input;
END;
$function$;
COMMENT ON FUNCTION pg_catalog.citus_internal_null_wrapper(input anyelement)
IS 'returns the input as the same type';

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.extract_equality_filters_from_query(
query_string text,
OUT left_table_name regclass,
OUT left_column_id int,
OUT right_table_name regclass,
OUT right_column_id int)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$extract_equality_filters_from_query$$;
COMMENT ON FUNCTION pg_catalog.extract_equality_filters_from_query(text)
IS 'returns equality filters and joins in the query';

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.extract_equality_filters_from_query(
query_string text,
OUT left_table_name regclass,
OUT left_column_id int,
OUT right_table_name regclass,
OUT right_column_id int)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$extract_equality_filters_from_query$$;
COMMENT ON FUNCTION pg_catalog.extract_equality_filters_from_query(text)
IS 'returns equality filters and joins in the query';

View File

@ -0,0 +1,439 @@
/*-------------------------------------------------------------------------
*
* extract_equality_filters_from_query.c
*
* This file contains functions to parse queries and return details.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include <stddef.h>
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "distributed/shard_pruning.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/tuplestore.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "nodes/value.h"
#include "optimizer/planner.h"
#include "tcop/tcopprot.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/palloc.h"
/*
* EqualityFilter represents a <column1> = <constant> or <column1> = <column2>
* filter. Its intent is to return the relation ID and attribute number;
*/
typedef struct EqualityFilter
{
/* relation ID of column1 */
Oid leftRelationId;
/* attribute number of column1 in the relation */
int leftAttno;
/* relation ID of column2 or InvalidOid of comparing to a Const */
Oid rightRelationId;
/* attribute number of column2 in the relation or InvalidAttrNumber */
int rightAttno;
} EqualityFilter;
/* ReplaceNullsContext gets passed down in the ReplaceNulls walker */
typedef struct ReplaceNullsContext
{
/* oid of pg_catalog.citus_internal_null_wrapper() */
Oid nullWrapperFunctionId;
} ReplaceNullsContext;
static Node * ReplaceNulls(Node *node, ReplaceNullsContext *context);
static bool VarNullWrapperOpExprClause(Expr *clause, Oid wrapperFunctionId, Var **column);
static List * AddEqualityFilterToSet(List *equalityFilters, EqualityFilter *filter);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(extract_equality_filters_from_query);
/*
* extract_equality_filters_from_query returns all the equality filters in a
* query.
*/
Datum
extract_equality_filters_from_query(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
char *queryString = text_to_cstring(PG_GETARG_TEXT_P(0));
MemoryContext oldContext = CurrentMemoryContext;
/* capture filters via hooks */
PlannerRestrictionContext *plannerRestrictionContext =
CreateAndPushPlannerRestrictionContext();
/* use a subtransaction to correctly handle failures */
BeginInternalSubTransaction(NULL);
/*
* Use PG_TRY to make sure we pop the planner restriction context
* and be robust to planner errors due to current limitations.
*/
PG_TRY();
{
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
Query *query = RewriteRawQueryStmt(rawStmt, queryString, NULL, 0);
CmdType commandType = query->commandType;
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE)
{
ereport(ERROR, (errmsg("can only process SELECT/INSERT/UPDATE/DELETE "
"commands")));
}
/* find the OID of pg_catalog.citus_internal_null_wrapper */
List *nameList = list_make2(makeString("pg_catalog"),
makeString("citus_internal_null_wrapper"));
Oid paramOids[1] = { ANYELEMENTOID };
Oid wrapperFunctionId = LookupFuncName(nameList, 1, paramOids, false);
/* replace NULL with pg_catalog.citus_internal_null_wrapper(NULL) */
ReplaceNullsContext replaceNullsContext;
replaceNullsContext.nullWrapperFunctionId = wrapperFunctionId;
query = (Query *) ReplaceNulls((Node *) query, &replaceNullsContext);
/* prepare the query by annotating RTEs */
List *rangeTableList = ExtractRangeTableEntryList(query);
AssignRTEIdentities(rangeTableList, 1);
int cursorOptions = 0;
/* run the planner to extract filters via hooks */
standard_planner_compat(query, cursorOptions, NULL);
RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext;
List *equalityFilters = NIL;
RelationRestriction *relationRestriction = NULL;
foreach_ptr(relationRestriction,
restrictionContext->relationRestrictionList)
{
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
Expr *clause = NULL;
foreach_ptr(clause, restrictClauseList)
{
Var *leftVar = NULL;
Const *rightConst = NULL;
if (!VarConstOpExprClause((OpExpr *) clause, &leftVar, &rightConst) &&
!VarNullWrapperOpExprClause(clause, wrapperFunctionId, &leftVar))
{
/* not of the form <column> = <constant> */
continue;
}
if (leftVar->varattno <= InvalidAttrNumber)
{
/* not a regular column */
continue;
}
/* found a <column> = <constant> expression */
EqualityFilter *constFilter = palloc0(sizeof(EqualityFilter));
constFilter->leftRelationId = relationRestriction->relationId;
constFilter->leftAttno = leftVar->varattno;
constFilter->rightRelationId = InvalidOid;
constFilter->rightAttno = InvalidAttrNumber;
equalityFilters = AddEqualityFilterToSet(equalityFilters, constFilter);
}
}
JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
JoinRestriction *joinRestriction = NULL;
foreach_ptr(joinRestriction, joinRestrictionContext->joinRestrictionList)
{
PlannerInfo *plannerInfo = joinRestriction->plannerInfo;
RestrictInfo *restrictionInfo = NULL;
foreach_ptr(restrictionInfo, joinRestriction->joinRestrictInfoList)
{
Expr *restrictionClause = restrictionInfo->clause;
Var *leftVar = NULL;
Var *rightVar = NULL;
if (!IsColumnEquiJoinClause(restrictionClause, &leftVar, &rightVar))
{
/* not a regular equi-join */
continue;
}
if (leftVar->varattno <= InvalidAttrNumber ||
rightVar->varattno <= InvalidAttrNumber)
{
/* at least one of the vars is not a regular column */
continue;
}
RangeTblEntry *leftRTE = plannerInfo->simple_rte_array[leftVar->varno];
if (leftRTE->rtekind != RTE_RELATION)
{
/* left column does not belong to a relation */
continue;
}
RangeTblEntry *rightRTE = plannerInfo->simple_rte_array[rightVar->varno];
if (rightRTE->rtekind != RTE_RELATION)
{
/* right column does not belong to a relation */
continue;
}
/* found a <column1> = <column2> expression */
EqualityFilter *joinFilter = palloc0(sizeof(EqualityFilter));
joinFilter->leftRelationId = leftRTE->relid;
joinFilter->leftAttno = leftVar->varattno;
joinFilter->rightRelationId = rightRTE->relid;
joinFilter->rightAttno = rightVar->varattno;
equalityFilters = AddEqualityFilterToSet(equalityFilters, joinFilter);
}
}
/* return all the filters via the tuple store */
EqualityFilter *equalityFilter = NULL;
foreach_ptr(equalityFilter, equalityFilters)
{
Datum values[4];
bool isNulls[4];
memset(values, 0, sizeof(values));
memset(isNulls, 0, sizeof(isNulls));
values[0] = ObjectIdGetDatum(equalityFilter->leftRelationId);
values[1] = Int32GetDatum(equalityFilter->leftAttno);
if (equalityFilter->rightRelationId != InvalidOid)
{
/* join clause */
values[2] = ObjectIdGetDatum(equalityFilter->rightRelationId);
values[3] = Int32GetDatum(equalityFilter->rightAttno);
}
else
{
/* constant clause */
isNulls[2] = true;
isNulls[3] = true;
}
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
ReleaseCurrentSubTransaction();
}
PG_CATCH();
{
MemoryContextSwitchTo(oldContext);
ErrorData *edata = CopyErrorData();
ereport(NOTICE, (errcode(edata->sqlerrcode),
errmsg("could not analyze query: %s", edata->message)));
FlushErrorState();
RollbackAndReleaseCurrentSubTransaction();
}
PG_END_TRY();
PopPlannerRestrictionContext();
PG_RETURN_VOID();
}
/*
* ReplaceNulls replaces occurrences of <column> = NULL with a dummy value
* or a call to citus_internal_null_wrapper in order to preserve the
* expression in the planner.
*
* We prefer to use the dummy value when possible because it allows the planner to be
* infer equality filters across joins.
*/
static Node *
ReplaceNulls(Node *node, ReplaceNullsContext *context)
{
if (node == NULL)
{
return NULL;
}
if (IsA(node, Const))
{
Const *constNode = (Const *) node;
if (!constNode->constisnull)
{
return node;
}
Const *newConst = copyObject(constNode);
switch (constNode->consttype)
{
/* numeric types map to 0 */
case CHAROID:
case BOOLOID:
case INT2OID:
case INT4OID:
case INT8OID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
case OIDOID:
case REGPROCOID:
case REGPROCEDUREOID:
case REGOPEROID:
case REGOPERATOROID:
case REGCLASSOID:
case REGTYPEOID:
case REGCONFIGOID:
case REGDICTIONARYOID:
case REGROLEOID:
case REGNAMESPACEOID:
{
newConst->constisnull = false;
newConst->constvalue = (Datum) 0;
return (Node *) newConst;
}
case BPCHAROID:
case VARCHAROID:
case TEXTOID:
{
newConst->constisnull = false;
newConst->constvalue = CStringGetTextDatum("");
return (Node *) newConst;
}
/* other types use a wrapper */
default:
{
/* build the call to citus_internal_null_wrapper */
FuncExpr *funcExpr = makeNode(FuncExpr);
funcExpr->funcid = context->nullWrapperFunctionId;
funcExpr->funcretset = false;
funcExpr->funcvariadic = false;
funcExpr->funcformat = 0;
funcExpr->funccollid = 0;
funcExpr->inputcollid = 0;
funcExpr->location = -1;
/* pass NULL as an argument */
funcExpr->args = list_make1(newConst);
return (Node *) funcExpr;
}
}
}
else if (IsA(node, Query))
{
return (Node *) query_tree_mutator((Query *) node, ReplaceNulls,
context, 0);
}
return expression_tree_mutator(node, ReplaceNulls, context);
}
/*
* VarNullWrapperOpExprClause checks whether an expression is of the
* form <column> = pg_catalog.citus_internal_null_wrapper(..) and
* returns the column.
*/
static bool
VarNullWrapperOpExprClause(Expr *clause, Oid wrapperFunctionId, Var **column)
{
Node *leftOperand;
Node *rightOperand;
if (!BinaryOpExpression((Expr *) clause, &leftOperand, &rightOperand))
{
return false;
}
if (IsA(leftOperand, Var) && IsA(rightOperand, FuncExpr))
{
FuncExpr *funcExpr = (FuncExpr *) rightOperand;
if (funcExpr->funcid == wrapperFunctionId)
{
*column = (Var *) leftOperand;
return true;
}
}
else if (IsA(leftOperand, FuncExpr) && IsA(rightOperand, Var))
{
FuncExpr *funcExpr = (FuncExpr *) leftOperand;
if (funcExpr->funcid == wrapperFunctionId)
{
*column = (Var *) rightOperand;
return true;
}
}
return false;
}
/*
* AddEqualityFilterToSet adds an equality filter to the list if it does
* not already exist and returns the new set.
*/
static List *
AddEqualityFilterToSet(List *equalityFilterSet, EqualityFilter *newFilter)
{
EqualityFilter *existingFilter = NULL;
foreach_ptr(existingFilter, equalityFilterSet)
{
if (memcmp(existingFilter, newFilter, sizeof(EqualityFilter)) == 0)
{
/* filter already exists */
return equalityFilterSet;
}
}
return lappend(equalityFilterSet, newFilter);
}

View File

@ -222,6 +222,8 @@ extern List * ExtractRangeTableEntryList(Query *query);
extern bool NeedsDistributedPlanning(Query *query);
extern List * TranslatedVarsForRteIdentity(int rteIdentity);
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
extern PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
extern void PopPlannerRestrictionContext(void);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index restrictionIndex, RangeTblEntry *rte);
extern void multi_join_restriction_hook(PlannerInfo *root,
@ -236,6 +238,7 @@ extern void EnsurePartitionTableNotReplicated(Oid relationId);
extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
extern int GetRTEIdentity(RangeTblEntry *rte);
extern bool GetOriginalInh(RangeTblEntry *rte);
extern LOCKMODE GetQueryLockMode(Query *query);

View File

@ -46,6 +46,7 @@ extern RelationRestriction * RelationRestrictionForRelation(
plannerRestrictionContext);
extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext *
joinRestrictionContext);
bool IsColumnEquiJoinClause(Expr *restrictionClause, Var **leftVar, Var **rightVar);
extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
RelationRestrictionContext *