From 1112b254a73a48fe6d550350737a6adff8b3600b Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 11 Jun 2020 11:59:44 +0300 Subject: [PATCH] adapt recently added code for pg13 This commit mostly adds pg_get_triggerdef_command to our ruleutils_13. This doesn't add anything extra for ruleutils 13 so it is basically a copy of the change on ruleutils_12 --- src/backend/distributed/commands/multi_copy.c | 26 +- .../distributed/commands/utility_hook.c | 6 +- .../connection/placement_connection.c | 2 +- .../distributed/deparser/ruleutils_13.c | 313 +++++++++++++++++- .../executor/insert_select_executor.c | 6 +- .../distributed/executor/local_executor.c | 3 +- .../distributed/executor/multi_executor.c | 8 +- .../distributed/metadata/metadata_cache.c | 2 +- .../distributed/operations/node_protocol.c | 5 +- .../distributed/planner/distributed_planner.c | 12 +- .../distributed/planner/multi_explain.c | 12 +- .../planner/multi_physical_planner.c | 31 +- .../relation_restriction_equivalence.c | 14 +- .../distributed/planner/tdigest_extension.c | 1 + .../test/distributed_intermediate_results.c | 8 +- .../test/foreign_key_relationship_query.c | 12 +- .../transaction/relation_access_tracking.c | 2 +- .../utils/foreign_key_relationship.c | 2 +- src/backend/distributed/utils/maintenanced.c | 2 +- src/include/distributed/commands/multi_copy.h | 3 +- .../distributed/commands/utility_hook.h | 4 +- src/include/distributed/distributed_planner.h | 19 +- src/include/distributed/listutils.h | 3 +- src/include/distributed/version_compat.h | 38 ++- 24 files changed, 440 insertions(+), 94 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 650763739..c857e754f 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -214,8 +214,10 @@ typedef struct ShardConnections /* Local functions forward declarations */ -static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); -static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId); +static void CopyToExistingShards(CopyStmt *copyStatement, + QueryCompletionCompat *completionTag); +static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, + Oid relationId); static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure, @@ -316,7 +318,8 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool ContainsLocalPlacement(int64 shardId); -static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount); +static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64 + processedRowCount); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); static bool ShouldExecuteCopyLocally(bool isIntermediateResult); @@ -568,7 +571,8 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT * tables where we create new shards into which to copy rows. */ static void -CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId) +CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid + relationId) { /* allocate column values and nulls arrays */ Relation distributedRelation = table_open(relationId, RowExclusiveLock); @@ -746,12 +750,15 @@ CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, O } } -static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount) { + +static void +CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64 processedRowCount) +{ #if PG_VERSION_NUM >= PG_VERSION_13 - SetQueryCompletion(completionTag, CMDTAG_COPY, processedRowCount); + SetQueryCompletion(completionTag, CMDTAG_COPY, processedRowCount); #else - SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processedRowCount); + SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processedRowCount); #endif } @@ -2780,7 +2787,8 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) * further processing is needed. */ Node * -ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const char *queryString) +ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const + char *queryString) { /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index af80c8b31..3257aa4af 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -91,8 +91,7 @@ static bool IsDropSchemaOrDB(Node *parsetree); void CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, - QueryCompletionCompat *completionTag - ) + QueryCompletionCompat *completionTag) { PlannedStmt *plannedStmt = makeNode(PlannedStmt); plannedStmt->commandType = CMD_UTILITY; @@ -119,8 +118,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, ParamListInfo params, struct QueryEnvironment *queryEnv, DestReceiver *dest, - QueryCompletionCompat *completionTag - ) + QueryCompletionCompat *completionTag) { Node *parsetree = pstmt->utilityStmt; List *ddlJobs = NIL; diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 5de19f9cd..6075fe8dc 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -26,7 +26,7 @@ #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" #include "utils/hsearch.h" -#if PG_VERSION_NUM >= PG_VERSION_13 +#if PG_VERSION_NUM >= PG_VERSION_13 #include "common/hashfn.h" #endif #include "utils/memutils.h" diff --git a/src/backend/distributed/deparser/ruleutils_13.c b/src/backend/distributed/deparser/ruleutils_13.c index 08b502dba..5e18bf500 100644 --- a/src/backend/distributed/deparser/ruleutils_13.c +++ b/src/backend/distributed/deparser/ruleutils_13.c @@ -105,6 +105,7 @@ /* Pretty flags */ #define PRETTYFLAG_PAREN 0x0001 #define PRETTYFLAG_INDENT 0x0002 +#define PRETTYFLAG_SCHEMA 0x0004 /* Default line length for pretty-print wrapping: 0 means wrap always */ #define WRAP_COLUMN_DEFAULT 0 @@ -112,6 +113,7 @@ /* macros to test if pretty action needed */ #define PRETTY_PAREN(context) ((context)->prettyFlags & PRETTYFLAG_PAREN) #define PRETTY_INDENT(context) ((context)->prettyFlags & PRETTYFLAG_INDENT) +#define PRETTY_SCHEMA(context) ((context)->prettyFlags & PRETTYFLAG_SCHEMA) /* ---------- @@ -135,7 +137,7 @@ typedef struct ParseExprKind special_exprkind; /* set only for exprkinds needing special * handling */ Bitmapset *appendparents; /* if not null, map child Vars of these relids - * back to the parent rel */ + * back to the parent rel */ } deparse_context; /* @@ -434,6 +436,8 @@ static void get_from_clause_coldeflist(RangeTblFunction *rtfunc, deparse_context *context); static void get_tablesample_def(TableSampleClause *tablesample, deparse_context *context); +static char *pg_get_triggerdef_worker(Oid trigid, bool pretty); +static void set_simple_column_names(deparse_namespace *dpns); static void get_opclass_name(Oid opclass, Oid actual_datatype, StringInfo buf); static Node *processIndirection(Node *node, deparse_context *context); @@ -2185,7 +2189,7 @@ get_simple_values_rte(Query *query, TupleDesc resultDesc) if (list_length(query->targetList) != list_length(result->eref->colnames)) return NULL; /* this probably cannot happen */ - colno = 0; + colno = 0; forboth(lc, query->targetList, lcn, result->eref->colnames) { TargetEntry *tle = (TargetEntry *) lfirst(lc); @@ -3554,7 +3558,7 @@ get_variable(Var *var, int levelsup, bool istoplevel, deparse_context *context) if (var->varnosyn > 0 && var->varnosyn <= list_length(dpns->rtable) && dpns->plan == NULL) { rte = rt_fetch(var->varnosyn, dpns->rtable); - // if the rte var->varnosync points to is not a regular table and it is a join + // if the rte var->varnosync points to is not a regular table and it is a join // then the correct relname will be found with var->varnosync and var->varattnosync // TODO:: this is a workaround and it can be simplified. if (rte->rtekind == RTE_JOIN && rte->relid == 0 && var->varnosyn != var->varno) { @@ -3577,7 +3581,7 @@ get_variable(Var *var, int levelsup, bool istoplevel, deparse_context *context) */ if (context->appendparents && dpns->appendrels) { - + Index pvarno = varno; AttrNumber pvarattno = varattno; AppendRelInfo *appinfo = dpns->appendrels[pvarno]; @@ -7508,6 +7512,307 @@ get_tablesample_def(TableSampleClause *tablesample, deparse_context *context) } } +char * +pg_get_triggerdef_command(Oid triggerId) +{ + Assert(OidIsValid(triggerId)); + + /* no need to have pretty SQL command */ + bool prettyOutput = false; + return pg_get_triggerdef_worker(triggerId, prettyOutput); +} + +static char * +pg_get_triggerdef_worker(Oid trigid, bool pretty) +{ + HeapTuple ht_trig; + Form_pg_trigger trigrec; + StringInfoData buf; + Relation tgrel; + ScanKeyData skey[1]; + SysScanDesc tgscan; + int findx = 0; + char *tgname; + char *tgoldtable; + char *tgnewtable; + Oid argtypes[1]; /* dummy */ + Datum value; + bool isnull; + + /* + * Fetch the pg_trigger tuple by the Oid of the trigger + */ + tgrel = table_open(TriggerRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_trigger_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(trigid)); + + tgscan = systable_beginscan(tgrel, TriggerOidIndexId, true, + NULL, 1, skey); + + ht_trig = systable_getnext(tgscan); + + if (!HeapTupleIsValid(ht_trig)) + { + systable_endscan(tgscan); + table_close(tgrel, AccessShareLock); + return NULL; + } + + trigrec = (Form_pg_trigger) GETSTRUCT(ht_trig); + + /* + * Start the trigger definition. Note that the trigger's name should never + * be schema-qualified, but the trigger rel's name may be. + */ + initStringInfo(&buf); + + tgname = NameStr(trigrec->tgname); + appendStringInfo(&buf, "CREATE %sTRIGGER %s ", + OidIsValid(trigrec->tgconstraint) ? "CONSTRAINT " : "", + quote_identifier(tgname)); + + if (TRIGGER_FOR_BEFORE(trigrec->tgtype)) + appendStringInfoString(&buf, "BEFORE"); + else if (TRIGGER_FOR_AFTER(trigrec->tgtype)) + appendStringInfoString(&buf, "AFTER"); + else if (TRIGGER_FOR_INSTEAD(trigrec->tgtype)) + appendStringInfoString(&buf, "INSTEAD OF"); + else + elog(ERROR, "unexpected tgtype value: %d", trigrec->tgtype); + + if (TRIGGER_FOR_INSERT(trigrec->tgtype)) + { + appendStringInfoString(&buf, " INSERT"); + findx++; + } + if (TRIGGER_FOR_DELETE(trigrec->tgtype)) + { + if (findx > 0) + appendStringInfoString(&buf, " OR DELETE"); + else + appendStringInfoString(&buf, " DELETE"); + findx++; + } + if (TRIGGER_FOR_UPDATE(trigrec->tgtype)) + { + if (findx > 0) + appendStringInfoString(&buf, " OR UPDATE"); + else + appendStringInfoString(&buf, " UPDATE"); + findx++; + /* tgattr is first var-width field, so OK to access directly */ + if (trigrec->tgattr.dim1 > 0) + { + int i; + + appendStringInfoString(&buf, " OF "); + for (i = 0; i < trigrec->tgattr.dim1; i++) + { + char *attname; + + if (i > 0) + appendStringInfoString(&buf, ", "); + attname = get_attname(trigrec->tgrelid, + trigrec->tgattr.values[i], false); + appendStringInfoString(&buf, quote_identifier(attname)); + } + } + } + if (TRIGGER_FOR_TRUNCATE(trigrec->tgtype)) + { + if (findx > 0) + appendStringInfoString(&buf, " OR TRUNCATE"); + else + appendStringInfoString(&buf, " TRUNCATE"); + findx++; + } + + /* + * In non-pretty mode, always schema-qualify the target table name for + * safety. In pretty mode, schema-qualify only if not visible. + */ + appendStringInfo(&buf, " ON %s ", + pretty ? + generate_relation_name(trigrec->tgrelid, NIL) : + generate_qualified_relation_name(trigrec->tgrelid)); + + if (OidIsValid(trigrec->tgconstraint)) + { + if (OidIsValid(trigrec->tgconstrrelid)) + appendStringInfo(&buf, "FROM %s ", + generate_relation_name(trigrec->tgconstrrelid, NIL)); + if (!trigrec->tgdeferrable) + appendStringInfoString(&buf, "NOT "); + appendStringInfoString(&buf, "DEFERRABLE INITIALLY "); + if (trigrec->tginitdeferred) + appendStringInfoString(&buf, "DEFERRED "); + else + appendStringInfoString(&buf, "IMMEDIATE "); + } + + value = fastgetattr(ht_trig, Anum_pg_trigger_tgoldtable, + tgrel->rd_att, &isnull); + if (!isnull) + tgoldtable = NameStr(*DatumGetName(value)); + else + tgoldtable = NULL; + value = fastgetattr(ht_trig, Anum_pg_trigger_tgnewtable, + tgrel->rd_att, &isnull); + if (!isnull) + tgnewtable = NameStr(*DatumGetName(value)); + else + tgnewtable = NULL; + if (tgoldtable != NULL || tgnewtable != NULL) + { + appendStringInfoString(&buf, "REFERENCING "); + if (tgoldtable != NULL) + appendStringInfo(&buf, "OLD TABLE AS %s ", + quote_identifier(tgoldtable)); + if (tgnewtable != NULL) + appendStringInfo(&buf, "NEW TABLE AS %s ", + quote_identifier(tgnewtable)); + } + + if (TRIGGER_FOR_ROW(trigrec->tgtype)) + appendStringInfoString(&buf, "FOR EACH ROW "); + else + appendStringInfoString(&buf, "FOR EACH STATEMENT "); + + /* If the trigger has a WHEN qualification, add that */ + value = fastgetattr(ht_trig, Anum_pg_trigger_tgqual, + tgrel->rd_att, &isnull); + if (!isnull) + { + Node *qual; + char relkind; + deparse_context context; + deparse_namespace dpns; + RangeTblEntry *oldrte; + RangeTblEntry *newrte; + + appendStringInfoString(&buf, "WHEN ("); + + qual = stringToNode(TextDatumGetCString(value)); + + relkind = get_rel_relkind(trigrec->tgrelid); + + /* Build minimal OLD and NEW RTEs for the rel */ + oldrte = makeNode(RangeTblEntry); + oldrte->rtekind = RTE_RELATION; + oldrte->relid = trigrec->tgrelid; + oldrte->relkind = relkind; + oldrte->rellockmode = AccessShareLock; + oldrte->alias = makeAlias("old", NIL); + oldrte->eref = oldrte->alias; + oldrte->lateral = false; + oldrte->inh = false; + oldrte->inFromCl = true; + + newrte = makeNode(RangeTblEntry); + newrte->rtekind = RTE_RELATION; + newrte->relid = trigrec->tgrelid; + newrte->relkind = relkind; + newrte->rellockmode = AccessShareLock; + newrte->alias = makeAlias("new", NIL); + newrte->eref = newrte->alias; + newrte->lateral = false; + newrte->inh = false; + newrte->inFromCl = true; + + /* Build two-element rtable */ + memset(&dpns, 0, sizeof(dpns)); + dpns.rtable = list_make2(oldrte, newrte); + dpns.ctes = NIL; + set_rtable_names(&dpns, NIL, NULL); + set_simple_column_names(&dpns); + + /* Set up context with one-deep namespace stack */ + context.buf = &buf; + context.namespaces = list_make1(&dpns); + context.windowClause = NIL; + context.windowTList = NIL; + context.varprefix = true; + context.prettyFlags = pretty ? (PRETTYFLAG_PAREN | PRETTYFLAG_INDENT | PRETTYFLAG_SCHEMA) : PRETTYFLAG_INDENT; + context.wrapColumn = WRAP_COLUMN_DEFAULT; + context.indentLevel = PRETTYINDENT_STD; + context.special_exprkind = EXPR_KIND_NONE; + + get_rule_expr(qual, &context, false); + + appendStringInfoString(&buf, ") "); + } + + appendStringInfo(&buf, "EXECUTE FUNCTION %s(", + generate_function_name(trigrec->tgfoid, 0, + NIL, argtypes, + false, NULL, EXPR_KIND_NONE)); + + if (trigrec->tgnargs > 0) + { + char *p; + int i; + + value = fastgetattr(ht_trig, Anum_pg_trigger_tgargs, + tgrel->rd_att, &isnull); + if (isnull) + elog(ERROR, "tgargs is null for trigger %u", trigid); + p = (char *) VARDATA_ANY(DatumGetByteaPP(value)); + for (i = 0; i < trigrec->tgnargs; i++) + { + if (i > 0) + appendStringInfoString(&buf, ", "); + simple_quote_literal(&buf, p); + /* advance p to next string embedded in tgargs */ + while (*p) + p++; + p++; + } + } + + /* We deliberately do not put semi-colon at end */ + appendStringInfoChar(&buf, ')'); + + /* Clean up */ + systable_endscan(tgscan); + + table_close(tgrel, AccessShareLock); + + return buf.data; +} + +/* + * set_simple_column_names: fill in column aliases for non-query situations + * + * This handles EXPLAIN and cases where we only have relation RTEs. Without + * a join tree, we can't do anything smart about join RTEs, but we don't + * need to (note that EXPLAIN should never see join alias Vars anyway). + * If we do hit a join RTE we'll just process it like a non-table base RTE. + */ +static void +set_simple_column_names(deparse_namespace *dpns) +{ + ListCell *lc; + ListCell *lc2; + + /* Initialize dpns->rtable_columns to contain zeroed structs */ + dpns->rtable_columns = NIL; + while (list_length(dpns->rtable_columns) < list_length(dpns->rtable)) + dpns->rtable_columns = lappend(dpns->rtable_columns, + palloc0(sizeof(deparse_columns))); + + /* Assign unique column aliases within each RTE */ + forboth(lc, dpns->rtable, lc2, dpns->rtable_columns) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); + deparse_columns *colinfo = (deparse_columns *) lfirst(lc2); + + set_relation_column_names(dpns, rte, colinfo); + } +} + /* * get_opclass_name - fetch name of an index operator class * diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index c85bbf958..dba7755d8 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -346,8 +346,10 @@ WrapSubquery(Query *subquery) /* create range table entries */ Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); - RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(addRangeTableEntryForSubquery( - pstate, subquery, selectAlias, false, true)); + RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem( + addRangeTableEntryForSubquery( + pstate, subquery, + selectAlias, false, true)); outerQuery->rtable = list_make1(newRangeTableEntry); /* set the FROM expression to the subquery */ diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index d7b5d24b9..e15624776 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -334,7 +334,8 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple 0); int cursorOptions = 0; ParamListInfo paramListInfo = NULL; - PlannedStmt *localPlan = planner_compat(shardQuery, NULL, cursorOptions, paramListInfo); + PlannedStmt *localPlan = planner_compat(shardQuery, NULL, cursorOptions, + paramListInfo); totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, tupleDest, task, paramListInfo); diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 92720e13e..d0c04e396 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -629,10 +629,10 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, portal->visible = false; PortalDefineQuerySelectCompat(portal, - NULL, - "", - list_make1(queryPlan), - NULL); + NULL, + "", + list_make1(queryPlan), + NULL); PortalStart(portal, params, eflags, GetActiveSnapshot()); PortalRun(portal, count, false, true, dest, dest, NULL); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 030f12083..146675fa0 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -65,7 +65,7 @@ #include "utils/datum.h" #include "utils/elog.h" #include "utils/hsearch.h" -#if PG_VERSION_NUM >= PG_VERSION_13 +#if PG_VERSION_NUM >= PG_VERSION_13 #include "common/hashfn.h" #endif #include "utils/inval.h" diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index d5fca40cc..3d10b1303 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -223,7 +223,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) /* allocate DDL statements, and then save position in DDL statements */ List *tableDDLEventList = GetTableDDLEvents(relationId, includeSequenceDefaults); tableDDLEventCell = list_head(tableDDLEventList); - ListCellAndListWrapper* wrapper = palloc0(sizeof(ListCellAndListWrapper)); + ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper)); wrapper->list = tableDDLEventList; wrapper->listCell = tableDDLEventCell; functionContext->user_fctx = wrapper; @@ -239,7 +239,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) */ functionContext = SRF_PERCALL_SETUP(); - ListCellAndListWrapper* wrapper = (ListCellAndListWrapper *) functionContext->user_fctx; + ListCellAndListWrapper *wrapper = + (ListCellAndListWrapper *) functionContext->user_fctx; if (wrapper->listCell != NULL) { char *ddlStatement = (char *) lfirst(wrapper->listCell); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 4c22b4f19..3a7f77a95 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -129,10 +129,10 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext PlannedStmt * distributed_planner(Query *parse, #if PG_VERSION_NUM >= PG_VERSION_13 - const char *query_string, + const char *query_string, #endif - int cursorOptions, - ParamListInfo boundParams) + int cursorOptions, + ParamListInfo boundParams) { bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; @@ -223,9 +223,9 @@ distributed_planner(Query *parse, * postgres' planner. */ planContext.plan = standard_planner_compat(planContext.query, - NULL, - planContext.cursorOptions, - planContext.boundParams); + NULL, + planContext.cursorOptions, + planContext.boundParams); if (needsDistributedPlanning) { result = PlanDistributedStmt(&planContext, rteIdCounter); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index b134078b5..c89e40d1c 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -221,6 +221,7 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; + if (es->analyze) { ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for INSERT " @@ -313,7 +314,8 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) INSTR_TIME_SET_ZERO(planduration); - ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration, NULL); + ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration, + NULL); if (es->format == EXPLAIN_FORMAT_TEXT) { @@ -961,7 +963,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) INSTR_TIME_SET_CURRENT(planStart); - PlannedStmt *plan = pg_plan_query(query, 0, NULL); + PlannedStmt *plan = pg_plan_query_compat(query, NULL, 0, NULL); INSTR_TIME_SET_CURRENT(planDuration); INSTR_TIME_SUBTRACT(planDuration, planStart); @@ -1126,14 +1128,14 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, INSTR_TIME_SET_CURRENT(planstart); /* plan the query */ - PlannedStmt *plan = pg_plan_query(query, cursorOptions, params); + PlannedStmt *plan = pg_plan_query_compat(query, NULL, cursorOptions, params); INSTR_TIME_SET_CURRENT(planduration); INSTR_TIME_SUBTRACT(planduration, planstart); /* run it (if needed) and produce output */ - ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration); + ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv, + &planduration, NULL); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 874b69bba..764d89e6a 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -131,7 +131,8 @@ static List * QueryFromList(List *rangeTableList); static Node * QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableList); static void SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry, - List *l_colnames, List *r_colnames, List* leftColVars, List* rightColVars); + List *l_colnames, List *r_colnames, + List *leftColVars, List *rightColVars); static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTableList); static int ExtractRangeTableId(Node *node); @@ -1262,32 +1263,40 @@ JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTable rangeTableEntry->joinaliasvars = joinedColumnVars; SetJoinRelatedColumnsCompat(rangeTableEntry, - leftColumnNames, rightColumnNames, leftColumnVars, rightColumnVars); + leftColumnNames, rightColumnNames, leftColumnVars, + rightColumnVars); return rangeTableEntry; } -static void SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry, - List *leftColumnNames, List *rightColumnNames, List* leftColumnVars, List* rightColumnVars) { +static void +SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry, + List *leftColumnNames, List *rightColumnNames, + List *leftColumnVars, List *rightColumnVars) +{ #if PG_VERSION_NUM >= PG_VERSION_13 /* We don't have any merged columns so set it to 0 */ - rangeTableEntry->joinmergedcols = 0; - Var* var = NULL; + rangeTableEntry->joinmergedcols = 0; + Var *var = NULL; int varId = 1; - foreach_ptr(var, leftColumnVars) { + foreach_ptr(var, leftColumnVars) + { rangeTableEntry->joinleftcols = lappend_int(rangeTableEntry->joinleftcols, varId); - varId++; - } + varId++; + } varId = 1; - foreach_ptr(var, rightColumnVars) { - rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols, varId); + foreach_ptr(var, rightColumnVars) + { + rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols, + varId); varId++; } #endif } + /* * ExtractRangeTableId gets the range table id from a node that could * either be a JoinExpr or RangeTblRef. diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index a322f4ccf..d12674be4 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1358,6 +1358,7 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass ** continue; } int rtoffset = RangeTableOffsetCompat(root, appendRelInfo); + /* set the varno accordingly for this specific child */ varToBeAdded->varno = appendRelInfo->child_relid - rtoffset; @@ -1366,16 +1367,21 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass ** } } + /* * RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo. * For PG < 13 this is a no op. */ -static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo) { +static int +RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo) +{ #if PG_VERSION_NUM >= PG_VERSION_13 int i = 1; - for (i = 1 ; i < root->simple_rel_array_size; i++) { - RangeTblEntry* rte = root->simple_rte_array[i]; - if (rte->inh) { + for (i = 1; i < root->simple_rel_array_size; i++) + { + RangeTblEntry *rte = root->simple_rte_array[i]; + if (rte->inh) + { break; } } diff --git a/src/backend/distributed/planner/tdigest_extension.c b/src/backend/distributed/planner/tdigest_extension.c index 5b67c16cf..123b170d4 100644 --- a/src/backend/distributed/planner/tdigest_extension.c +++ b/src/backend/distributed/planner/tdigest_extension.c @@ -14,6 +14,7 @@ #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" #include "distributed/tdigest_extension.h" +#include "distributed/version_compat.h" #include "parser/parse_func.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 474403d3e..2ab999685 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -51,8 +51,8 @@ partition_task_list_results(PG_FUNCTION_ARGS) Query *parsedQuery = ParseQueryString(queryString, NULL, 0); PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString, - CURSOR_OPT_PARALLEL_OK, - NULL); + CURSOR_OPT_PARALLEL_OK, + NULL); if (!IsCitusCustomScan(queryPlan->planTree)) { ereport(ERROR, (errmsg("query must be distributed and shouldn't require " @@ -124,8 +124,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) Query *parsedQuery = ParseQueryString(queryString, NULL, 0); PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString, - CURSOR_OPT_PARALLEL_OK, - NULL); + CURSOR_OPT_PARALLEL_OK, + NULL); if (!IsCitusCustomScan(queryPlan->planTree)) { ereport(ERROR, (errmsg("query must be distributed and shouldn't require " diff --git a/src/backend/distributed/test/foreign_key_relationship_query.c b/src/backend/distributed/test/foreign_key_relationship_query.c index d93938fa5..b1f1f7c8d 100644 --- a/src/backend/distributed/test/foreign_key_relationship_query.c +++ b/src/backend/distributed/test/foreign_key_relationship_query.c @@ -48,11 +48,11 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS) MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); List *refList = list_copy( cacheEntry->referencingRelationsViaForeignKey); - ListCellAndListWrapper* wrapper = palloc0(sizeof(ListCellAndListWrapper)); + ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper)); foreignRelationCell = list_head(refList); wrapper->list = refList; wrapper->listCell = foreignRelationCell; - functionContext->user_fctx = wrapper; + functionContext->user_fctx = wrapper; MemoryContextSwitchTo(oldContext); } @@ -64,7 +64,8 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS) */ functionContext = SRF_PERCALL_SETUP(); - ListCellAndListWrapper* wrapper = (ListCellAndListWrapper *) functionContext->user_fctx; + ListCellAndListWrapper *wrapper = + (ListCellAndListWrapper *) functionContext->user_fctx; if (wrapper->listCell != NULL) { Oid refId = lfirst_oid(wrapper->listCell); @@ -106,7 +107,7 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS) MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); List *refList = list_copy(cacheEntry->referencedRelationsViaForeignKey); foreignRelationCell = list_head(refList); - ListCellAndListWrapper* wrapper = palloc0(sizeof(ListCellAndListWrapper)); + ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper)); wrapper->list = refList; wrapper->listCell = foreignRelationCell; functionContext->user_fctx = wrapper; @@ -121,7 +122,8 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS) */ functionContext = SRF_PERCALL_SETUP(); - ListCellAndListWrapper* wrapper = (ListCellAndListWrapper *) functionContext->user_fctx; + ListCellAndListWrapper *wrapper = + (ListCellAndListWrapper *) functionContext->user_fctx; if (wrapper->listCell != NULL) { diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index c2af6917b..b5dafa135 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -29,7 +29,7 @@ #include "distributed/metadata_cache.h" #include "distributed/relation_access_tracking.h" #include "utils/hsearch.h" -#if PG_VERSION_NUM >= PG_VERSION_13 +#if PG_VERSION_NUM >= PG_VERSION_13 #include "common/hashfn.h" #endif #include "utils/lsyscache.h" diff --git a/src/backend/distributed/utils/foreign_key_relationship.c b/src/backend/distributed/utils/foreign_key_relationship.c index a116a5479..1dd61bcf5 100644 --- a/src/backend/distributed/utils/foreign_key_relationship.c +++ b/src/backend/distributed/utils/foreign_key_relationship.c @@ -31,7 +31,7 @@ #include "storage/lockdefs.h" #include "utils/fmgroids.h" #include "utils/hsearch.h" -#if PG_VERSION_NUM >= PG_VERSION_13 +#if PG_VERSION_NUM >= PG_VERSION_13 #include "common/hashfn.h" #endif #include "utils/memutils.h" diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 77927ce16..4d6b79d3e 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -51,7 +51,7 @@ #include "storage/lmgr.h" #include "storage/lwlock.h" #include "tcop/tcopprot.h" -#if PG_VERSION_NUM >= PG_VERSION_13 +#if PG_VERSION_NUM >= PG_VERSION_13 #include "common/hashfn.h" #endif #include "utils/memutils.h" diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index e840b73b6..c1aed58ee 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -156,7 +156,8 @@ extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void EndRemoteCopy(int64 shardId, List *connectionList); extern List * CreateRangeTable(Relation rel, AclMode requiredAccess); -extern Node * ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, +extern Node * ProcessCopyStmt(CopyStmt *copyStatement, + QueryCompletionCompat *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); extern bool IsCopyResultStmt(CopyStmt *copyStatement); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index c9543d8b1..3b6893e27 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -54,11 +54,11 @@ extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, struct QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletionCompat *completionTag - ); + ); extern void CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, - QueryCompletionCompat * completionTag + QueryCompletionCompat *completionTag ); extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 9a71b939c..b385e8244 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -183,13 +183,18 @@ typedef struct CitusCustomScanPath } CitusCustomScanPath; -extern PlannedStmt * distributed_planner( - Query *parse, - #if PG_VERSION_NUM >= PG_VERSION_13 - const char *query_string, - #endif - int cursorOptions, - ParamListInfo boundParams); +#if PG_VERSION_NUM >= PG_VERSION_13 +extern PlannedStmt * distributed_planner(Query *parse, + const char *query_string, + int cursorOptions, + ParamListInfo boundParams); +#else +extern PlannedStmt * distributed_planner(Query *parse, + int cursorOptions, + ParamListInfo boundParams); +#endif + + extern List * ExtractRangeTableEntryList(Query *query); extern List * ExtractReferenceTableRTEList(List *rteList); extern bool NeedsDistributedPlanning(Query *query); diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index e37addbe1..fb37bf829 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -27,7 +27,8 @@ * in separate function calls, we need both the list and the current cell. * Therefore this wrapper stores both of them. */ -typedef struct ListCellAndListWrapper { +typedef struct ListCellAndListWrapper +{ List *list; ListCell *listCell; } ListCellAndListWrapper; diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 9c2fbfb91..52d4bd5d9 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -30,30 +30,34 @@ #if PG_VERSION_NUM >= PG_VERSION_13 #define lnext_compat(l, r) lnext(l, r) -#define list_delete_cell_compat(l,c,p) list_delete_cell(l,c) -#define pg_plan_query_compat(p,q,c,b) pg_plan_query(p,q,c,b) -#define planner_compat(p,q,c,b) planner(p,q,c,b) -#define standard_planner_compat(a,b,c,d) standard_planner(a,b,c,d) -#define PortalDefineQuerySelectCompat(a,b,c,e,f) PortalDefineQuery(a,b,c,CMDTAG_SELECT,e,f) -#define getOwnedSequencesCompat(a,b) getOwnedSequences(a) -#define ExplainOnePlanCompat(a,b,c,d,e,f,g,h) ExplainOnePlan(a,b,c,d,e,f,g,h) +#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c) +#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b) +#define planner_compat(p, q, c, b) planner(p, q, c, b) +#define standard_planner_compat(a, b, c, d) standard_planner(a, b, c, d) +#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, \ + CMDTAG_SELECT, e, \ + f) +#define getOwnedSequencesCompat(a, b) getOwnedSequences(a) +#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g, \ + h) #define varoattno varattnosyn #define varnoold varnosyn -#define Set_ptr_value(a,b) a->ptr_value = b +#define Set_ptr_value(a, b) a->ptr_value = b #define RangeTableEntryFromNSItem(a) a->p_rte #define QueryCompletionCompat QueryCompletion #else /* pre PG13 */ #define lnext_compat(l, r) lnext(r) -#define list_delete_cell_compat(l,c,p) list_delete_cell(l,c,p) -#define pg_plan_query_compat(p,q,c,b) pg_plan_query(p,c,b) -#define planner_compat(p,q,c,b) planner(p,c,b) -#define standard_planner_compat(a,b,c,d) standard_planner(a,c,d) -#define PortalDefineQuerySelectCompat(a,b,c,e,f) PortalDefineQuery(a,b,c,"SELECT",e,f) -#define getOwnedSequencesCompat(a,b) getOwnedSequences(a,b) -#define ExplainOnePlanCompat(a,b,c,d,e,f,g,h) ExplainOnePlan(a,b,c,d,e,f,g) -#define Set_ptr_value(a,b) a->data.ptr_value = b +#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p) +#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b) +#define planner_compat(p, q, c, b) planner(p, c, b) +#define standard_planner_compat(a, b, c, d) standard_planner(a, c, d) +#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, "SELECT", \ + e, f) +#define getOwnedSequencesCompat(a, b) getOwnedSequences(a, b) +#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g) +#define Set_ptr_value(a, b) a->data.ptr_value = b #define RangeTableEntryFromNSItem(a) a -#define QueryCompletionCompat char +#define QueryCompletionCompat char #endif #if PG_VERSION_NUM >= PG_VERSION_12