mirror of https://github.com/citusdata/citus.git
adapt recently added code for pg13
This commit mostly adds pg_get_triggerdef_command to our ruleutils_13. This doesn't add anything extra for ruleutils 13 so it is basically a copy of the change on ruleutils_12pull/3900/head
parent
ff7a563c57
commit
1112b254a7
|
@ -214,8 +214,10 @@ typedef struct ShardConnections
|
|||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag);
|
||||
static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId);
|
||||
static void CopyToExistingShards(CopyStmt *copyStatement,
|
||||
QueryCompletionCompat *completionTag);
|
||||
static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag,
|
||||
Oid relationId);
|
||||
static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
|
||||
ShardConnections *shardConnections, bool
|
||||
stopOnFailure,
|
||||
|
@ -316,7 +318,8 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot,
|
|||
static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver);
|
||||
static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver);
|
||||
static bool ContainsLocalPlacement(int64 shardId);
|
||||
static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount);
|
||||
static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64
|
||||
processedRowCount);
|
||||
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
|
||||
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
|
||||
static bool ShouldExecuteCopyLocally(bool isIntermediateResult);
|
||||
|
@ -568,7 +571,8 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
|
|||
* tables where we create new shards into which to copy rows.
|
||||
*/
|
||||
static void
|
||||
CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId)
|
||||
CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid
|
||||
relationId)
|
||||
{
|
||||
/* allocate column values and nulls arrays */
|
||||
Relation distributedRelation = table_open(relationId, RowExclusiveLock);
|
||||
|
@ -746,12 +750,15 @@ CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, O
|
|||
}
|
||||
}
|
||||
|
||||
static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount) {
|
||||
|
||||
static void
|
||||
CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64 processedRowCount)
|
||||
{
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
SetQueryCompletion(completionTag, CMDTAG_COPY, processedRowCount);
|
||||
SetQueryCompletion(completionTag, CMDTAG_COPY, processedRowCount);
|
||||
#else
|
||||
SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, processedRowCount);
|
||||
SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, processedRowCount);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -2780,7 +2787,8 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName)
|
|||
* further processing is needed.
|
||||
*/
|
||||
Node *
|
||||
ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const char *queryString)
|
||||
ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const
|
||||
char *queryString)
|
||||
{
|
||||
/*
|
||||
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
|
||||
|
|
|
@ -91,8 +91,7 @@ static bool IsDropSchemaOrDB(Node *parsetree);
|
|||
void
|
||||
CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context,
|
||||
ParamListInfo params, DestReceiver *dest,
|
||||
QueryCompletionCompat *completionTag
|
||||
)
|
||||
QueryCompletionCompat *completionTag)
|
||||
{
|
||||
PlannedStmt *plannedStmt = makeNode(PlannedStmt);
|
||||
plannedStmt->commandType = CMD_UTILITY;
|
||||
|
@ -119,8 +118,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
ParamListInfo params,
|
||||
struct QueryEnvironment *queryEnv,
|
||||
DestReceiver *dest,
|
||||
QueryCompletionCompat *completionTag
|
||||
)
|
||||
QueryCompletionCompat *completionTag)
|
||||
{
|
||||
Node *parsetree = pstmt->utilityStmt;
|
||||
List *ddlJobs = NIL;
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "utils/hsearch.h"
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "utils/memutils.h"
|
||||
|
|
|
@ -105,6 +105,7 @@
|
|||
/* Pretty flags */
|
||||
#define PRETTYFLAG_PAREN 0x0001
|
||||
#define PRETTYFLAG_INDENT 0x0002
|
||||
#define PRETTYFLAG_SCHEMA 0x0004
|
||||
|
||||
/* Default line length for pretty-print wrapping: 0 means wrap always */
|
||||
#define WRAP_COLUMN_DEFAULT 0
|
||||
|
@ -112,6 +113,7 @@
|
|||
/* macros to test if pretty action needed */
|
||||
#define PRETTY_PAREN(context) ((context)->prettyFlags & PRETTYFLAG_PAREN)
|
||||
#define PRETTY_INDENT(context) ((context)->prettyFlags & PRETTYFLAG_INDENT)
|
||||
#define PRETTY_SCHEMA(context) ((context)->prettyFlags & PRETTYFLAG_SCHEMA)
|
||||
|
||||
|
||||
/* ----------
|
||||
|
@ -135,7 +137,7 @@ typedef struct
|
|||
ParseExprKind special_exprkind; /* set only for exprkinds needing special
|
||||
* handling */
|
||||
Bitmapset *appendparents; /* if not null, map child Vars of these relids
|
||||
* back to the parent rel */
|
||||
* back to the parent rel */
|
||||
} deparse_context;
|
||||
|
||||
/*
|
||||
|
@ -434,6 +436,8 @@ static void get_from_clause_coldeflist(RangeTblFunction *rtfunc,
|
|||
deparse_context *context);
|
||||
static void get_tablesample_def(TableSampleClause *tablesample,
|
||||
deparse_context *context);
|
||||
static char *pg_get_triggerdef_worker(Oid trigid, bool pretty);
|
||||
static void set_simple_column_names(deparse_namespace *dpns);
|
||||
static void get_opclass_name(Oid opclass, Oid actual_datatype,
|
||||
StringInfo buf);
|
||||
static Node *processIndirection(Node *node, deparse_context *context);
|
||||
|
@ -2185,7 +2189,7 @@ get_simple_values_rte(Query *query, TupleDesc resultDesc)
|
|||
|
||||
if (list_length(query->targetList) != list_length(result->eref->colnames))
|
||||
return NULL; /* this probably cannot happen */
|
||||
colno = 0;
|
||||
colno = 0;
|
||||
forboth(lc, query->targetList, lcn, result->eref->colnames)
|
||||
{
|
||||
TargetEntry *tle = (TargetEntry *) lfirst(lc);
|
||||
|
@ -3554,7 +3558,7 @@ get_variable(Var *var, int levelsup, bool istoplevel, deparse_context *context)
|
|||
if (var->varnosyn > 0 && var->varnosyn <= list_length(dpns->rtable) && dpns->plan == NULL) {
|
||||
rte = rt_fetch(var->varnosyn, dpns->rtable);
|
||||
|
||||
// if the rte var->varnosync points to is not a regular table and it is a join
|
||||
// if the rte var->varnosync points to is not a regular table and it is a join
|
||||
// then the correct relname will be found with var->varnosync and var->varattnosync
|
||||
// TODO:: this is a workaround and it can be simplified.
|
||||
if (rte->rtekind == RTE_JOIN && rte->relid == 0 && var->varnosyn != var->varno) {
|
||||
|
@ -3577,7 +3581,7 @@ get_variable(Var *var, int levelsup, bool istoplevel, deparse_context *context)
|
|||
*/
|
||||
if (context->appendparents && dpns->appendrels)
|
||||
{
|
||||
|
||||
|
||||
Index pvarno = varno;
|
||||
AttrNumber pvarattno = varattno;
|
||||
AppendRelInfo *appinfo = dpns->appendrels[pvarno];
|
||||
|
@ -7508,6 +7512,307 @@ get_tablesample_def(TableSampleClause *tablesample, deparse_context *context)
|
|||
}
|
||||
}
|
||||
|
||||
char *
|
||||
pg_get_triggerdef_command(Oid triggerId)
|
||||
{
|
||||
Assert(OidIsValid(triggerId));
|
||||
|
||||
/* no need to have pretty SQL command */
|
||||
bool prettyOutput = false;
|
||||
return pg_get_triggerdef_worker(triggerId, prettyOutput);
|
||||
}
|
||||
|
||||
static char *
|
||||
pg_get_triggerdef_worker(Oid trigid, bool pretty)
|
||||
{
|
||||
HeapTuple ht_trig;
|
||||
Form_pg_trigger trigrec;
|
||||
StringInfoData buf;
|
||||
Relation tgrel;
|
||||
ScanKeyData skey[1];
|
||||
SysScanDesc tgscan;
|
||||
int findx = 0;
|
||||
char *tgname;
|
||||
char *tgoldtable;
|
||||
char *tgnewtable;
|
||||
Oid argtypes[1]; /* dummy */
|
||||
Datum value;
|
||||
bool isnull;
|
||||
|
||||
/*
|
||||
* Fetch the pg_trigger tuple by the Oid of the trigger
|
||||
*/
|
||||
tgrel = table_open(TriggerRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&skey[0],
|
||||
Anum_pg_trigger_oid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(trigid));
|
||||
|
||||
tgscan = systable_beginscan(tgrel, TriggerOidIndexId, true,
|
||||
NULL, 1, skey);
|
||||
|
||||
ht_trig = systable_getnext(tgscan);
|
||||
|
||||
if (!HeapTupleIsValid(ht_trig))
|
||||
{
|
||||
systable_endscan(tgscan);
|
||||
table_close(tgrel, AccessShareLock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
trigrec = (Form_pg_trigger) GETSTRUCT(ht_trig);
|
||||
|
||||
/*
|
||||
* Start the trigger definition. Note that the trigger's name should never
|
||||
* be schema-qualified, but the trigger rel's name may be.
|
||||
*/
|
||||
initStringInfo(&buf);
|
||||
|
||||
tgname = NameStr(trigrec->tgname);
|
||||
appendStringInfo(&buf, "CREATE %sTRIGGER %s ",
|
||||
OidIsValid(trigrec->tgconstraint) ? "CONSTRAINT " : "",
|
||||
quote_identifier(tgname));
|
||||
|
||||
if (TRIGGER_FOR_BEFORE(trigrec->tgtype))
|
||||
appendStringInfoString(&buf, "BEFORE");
|
||||
else if (TRIGGER_FOR_AFTER(trigrec->tgtype))
|
||||
appendStringInfoString(&buf, "AFTER");
|
||||
else if (TRIGGER_FOR_INSTEAD(trigrec->tgtype))
|
||||
appendStringInfoString(&buf, "INSTEAD OF");
|
||||
else
|
||||
elog(ERROR, "unexpected tgtype value: %d", trigrec->tgtype);
|
||||
|
||||
if (TRIGGER_FOR_INSERT(trigrec->tgtype))
|
||||
{
|
||||
appendStringInfoString(&buf, " INSERT");
|
||||
findx++;
|
||||
}
|
||||
if (TRIGGER_FOR_DELETE(trigrec->tgtype))
|
||||
{
|
||||
if (findx > 0)
|
||||
appendStringInfoString(&buf, " OR DELETE");
|
||||
else
|
||||
appendStringInfoString(&buf, " DELETE");
|
||||
findx++;
|
||||
}
|
||||
if (TRIGGER_FOR_UPDATE(trigrec->tgtype))
|
||||
{
|
||||
if (findx > 0)
|
||||
appendStringInfoString(&buf, " OR UPDATE");
|
||||
else
|
||||
appendStringInfoString(&buf, " UPDATE");
|
||||
findx++;
|
||||
/* tgattr is first var-width field, so OK to access directly */
|
||||
if (trigrec->tgattr.dim1 > 0)
|
||||
{
|
||||
int i;
|
||||
|
||||
appendStringInfoString(&buf, " OF ");
|
||||
for (i = 0; i < trigrec->tgattr.dim1; i++)
|
||||
{
|
||||
char *attname;
|
||||
|
||||
if (i > 0)
|
||||
appendStringInfoString(&buf, ", ");
|
||||
attname = get_attname(trigrec->tgrelid,
|
||||
trigrec->tgattr.values[i], false);
|
||||
appendStringInfoString(&buf, quote_identifier(attname));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (TRIGGER_FOR_TRUNCATE(trigrec->tgtype))
|
||||
{
|
||||
if (findx > 0)
|
||||
appendStringInfoString(&buf, " OR TRUNCATE");
|
||||
else
|
||||
appendStringInfoString(&buf, " TRUNCATE");
|
||||
findx++;
|
||||
}
|
||||
|
||||
/*
|
||||
* In non-pretty mode, always schema-qualify the target table name for
|
||||
* safety. In pretty mode, schema-qualify only if not visible.
|
||||
*/
|
||||
appendStringInfo(&buf, " ON %s ",
|
||||
pretty ?
|
||||
generate_relation_name(trigrec->tgrelid, NIL) :
|
||||
generate_qualified_relation_name(trigrec->tgrelid));
|
||||
|
||||
if (OidIsValid(trigrec->tgconstraint))
|
||||
{
|
||||
if (OidIsValid(trigrec->tgconstrrelid))
|
||||
appendStringInfo(&buf, "FROM %s ",
|
||||
generate_relation_name(trigrec->tgconstrrelid, NIL));
|
||||
if (!trigrec->tgdeferrable)
|
||||
appendStringInfoString(&buf, "NOT ");
|
||||
appendStringInfoString(&buf, "DEFERRABLE INITIALLY ");
|
||||
if (trigrec->tginitdeferred)
|
||||
appendStringInfoString(&buf, "DEFERRED ");
|
||||
else
|
||||
appendStringInfoString(&buf, "IMMEDIATE ");
|
||||
}
|
||||
|
||||
value = fastgetattr(ht_trig, Anum_pg_trigger_tgoldtable,
|
||||
tgrel->rd_att, &isnull);
|
||||
if (!isnull)
|
||||
tgoldtable = NameStr(*DatumGetName(value));
|
||||
else
|
||||
tgoldtable = NULL;
|
||||
value = fastgetattr(ht_trig, Anum_pg_trigger_tgnewtable,
|
||||
tgrel->rd_att, &isnull);
|
||||
if (!isnull)
|
||||
tgnewtable = NameStr(*DatumGetName(value));
|
||||
else
|
||||
tgnewtable = NULL;
|
||||
if (tgoldtable != NULL || tgnewtable != NULL)
|
||||
{
|
||||
appendStringInfoString(&buf, "REFERENCING ");
|
||||
if (tgoldtable != NULL)
|
||||
appendStringInfo(&buf, "OLD TABLE AS %s ",
|
||||
quote_identifier(tgoldtable));
|
||||
if (tgnewtable != NULL)
|
||||
appendStringInfo(&buf, "NEW TABLE AS %s ",
|
||||
quote_identifier(tgnewtable));
|
||||
}
|
||||
|
||||
if (TRIGGER_FOR_ROW(trigrec->tgtype))
|
||||
appendStringInfoString(&buf, "FOR EACH ROW ");
|
||||
else
|
||||
appendStringInfoString(&buf, "FOR EACH STATEMENT ");
|
||||
|
||||
/* If the trigger has a WHEN qualification, add that */
|
||||
value = fastgetattr(ht_trig, Anum_pg_trigger_tgqual,
|
||||
tgrel->rd_att, &isnull);
|
||||
if (!isnull)
|
||||
{
|
||||
Node *qual;
|
||||
char relkind;
|
||||
deparse_context context;
|
||||
deparse_namespace dpns;
|
||||
RangeTblEntry *oldrte;
|
||||
RangeTblEntry *newrte;
|
||||
|
||||
appendStringInfoString(&buf, "WHEN (");
|
||||
|
||||
qual = stringToNode(TextDatumGetCString(value));
|
||||
|
||||
relkind = get_rel_relkind(trigrec->tgrelid);
|
||||
|
||||
/* Build minimal OLD and NEW RTEs for the rel */
|
||||
oldrte = makeNode(RangeTblEntry);
|
||||
oldrte->rtekind = RTE_RELATION;
|
||||
oldrte->relid = trigrec->tgrelid;
|
||||
oldrte->relkind = relkind;
|
||||
oldrte->rellockmode = AccessShareLock;
|
||||
oldrte->alias = makeAlias("old", NIL);
|
||||
oldrte->eref = oldrte->alias;
|
||||
oldrte->lateral = false;
|
||||
oldrte->inh = false;
|
||||
oldrte->inFromCl = true;
|
||||
|
||||
newrte = makeNode(RangeTblEntry);
|
||||
newrte->rtekind = RTE_RELATION;
|
||||
newrte->relid = trigrec->tgrelid;
|
||||
newrte->relkind = relkind;
|
||||
newrte->rellockmode = AccessShareLock;
|
||||
newrte->alias = makeAlias("new", NIL);
|
||||
newrte->eref = newrte->alias;
|
||||
newrte->lateral = false;
|
||||
newrte->inh = false;
|
||||
newrte->inFromCl = true;
|
||||
|
||||
/* Build two-element rtable */
|
||||
memset(&dpns, 0, sizeof(dpns));
|
||||
dpns.rtable = list_make2(oldrte, newrte);
|
||||
dpns.ctes = NIL;
|
||||
set_rtable_names(&dpns, NIL, NULL);
|
||||
set_simple_column_names(&dpns);
|
||||
|
||||
/* Set up context with one-deep namespace stack */
|
||||
context.buf = &buf;
|
||||
context.namespaces = list_make1(&dpns);
|
||||
context.windowClause = NIL;
|
||||
context.windowTList = NIL;
|
||||
context.varprefix = true;
|
||||
context.prettyFlags = pretty ? (PRETTYFLAG_PAREN | PRETTYFLAG_INDENT | PRETTYFLAG_SCHEMA) : PRETTYFLAG_INDENT;
|
||||
context.wrapColumn = WRAP_COLUMN_DEFAULT;
|
||||
context.indentLevel = PRETTYINDENT_STD;
|
||||
context.special_exprkind = EXPR_KIND_NONE;
|
||||
|
||||
get_rule_expr(qual, &context, false);
|
||||
|
||||
appendStringInfoString(&buf, ") ");
|
||||
}
|
||||
|
||||
appendStringInfo(&buf, "EXECUTE FUNCTION %s(",
|
||||
generate_function_name(trigrec->tgfoid, 0,
|
||||
NIL, argtypes,
|
||||
false, NULL, EXPR_KIND_NONE));
|
||||
|
||||
if (trigrec->tgnargs > 0)
|
||||
{
|
||||
char *p;
|
||||
int i;
|
||||
|
||||
value = fastgetattr(ht_trig, Anum_pg_trigger_tgargs,
|
||||
tgrel->rd_att, &isnull);
|
||||
if (isnull)
|
||||
elog(ERROR, "tgargs is null for trigger %u", trigid);
|
||||
p = (char *) VARDATA_ANY(DatumGetByteaPP(value));
|
||||
for (i = 0; i < trigrec->tgnargs; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
appendStringInfoString(&buf, ", ");
|
||||
simple_quote_literal(&buf, p);
|
||||
/* advance p to next string embedded in tgargs */
|
||||
while (*p)
|
||||
p++;
|
||||
p++;
|
||||
}
|
||||
}
|
||||
|
||||
/* We deliberately do not put semi-colon at end */
|
||||
appendStringInfoChar(&buf, ')');
|
||||
|
||||
/* Clean up */
|
||||
systable_endscan(tgscan);
|
||||
|
||||
table_close(tgrel, AccessShareLock);
|
||||
|
||||
return buf.data;
|
||||
}
|
||||
|
||||
/*
|
||||
* set_simple_column_names: fill in column aliases for non-query situations
|
||||
*
|
||||
* This handles EXPLAIN and cases where we only have relation RTEs. Without
|
||||
* a join tree, we can't do anything smart about join RTEs, but we don't
|
||||
* need to (note that EXPLAIN should never see join alias Vars anyway).
|
||||
* If we do hit a join RTE we'll just process it like a non-table base RTE.
|
||||
*/
|
||||
static void
|
||||
set_simple_column_names(deparse_namespace *dpns)
|
||||
{
|
||||
ListCell *lc;
|
||||
ListCell *lc2;
|
||||
|
||||
/* Initialize dpns->rtable_columns to contain zeroed structs */
|
||||
dpns->rtable_columns = NIL;
|
||||
while (list_length(dpns->rtable_columns) < list_length(dpns->rtable))
|
||||
dpns->rtable_columns = lappend(dpns->rtable_columns,
|
||||
palloc0(sizeof(deparse_columns)));
|
||||
|
||||
/* Assign unique column aliases within each RTE */
|
||||
forboth(lc, dpns->rtable, lc2, dpns->rtable_columns)
|
||||
{
|
||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
|
||||
deparse_columns *colinfo = (deparse_columns *) lfirst(lc2);
|
||||
|
||||
set_relation_column_names(dpns, rte, colinfo);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* get_opclass_name - fetch name of an index operator class
|
||||
*
|
||||
|
|
|
@ -346,8 +346,10 @@ WrapSubquery(Query *subquery)
|
|||
|
||||
/* create range table entries */
|
||||
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
|
||||
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(addRangeTableEntryForSubquery(
|
||||
pstate, subquery, selectAlias, false, true));
|
||||
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
|
||||
addRangeTableEntryForSubquery(
|
||||
pstate, subquery,
|
||||
selectAlias, false, true));
|
||||
outerQuery->rtable = list_make1(newRangeTableEntry);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
|
|
|
@ -334,7 +334,8 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple
|
|||
0);
|
||||
int cursorOptions = 0;
|
||||
ParamListInfo paramListInfo = NULL;
|
||||
PlannedStmt *localPlan = planner_compat(shardQuery, NULL, cursorOptions, paramListInfo);
|
||||
PlannedStmt *localPlan = planner_compat(shardQuery, NULL, cursorOptions,
|
||||
paramListInfo);
|
||||
totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString,
|
||||
tupleDest, task,
|
||||
paramListInfo);
|
||||
|
|
|
@ -629,10 +629,10 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
|
|||
portal->visible = false;
|
||||
|
||||
PortalDefineQuerySelectCompat(portal,
|
||||
NULL,
|
||||
"",
|
||||
list_make1(queryPlan),
|
||||
NULL);
|
||||
NULL,
|
||||
"",
|
||||
list_make1(queryPlan),
|
||||
NULL);
|
||||
|
||||
PortalStart(portal, params, eflags, GetActiveSnapshot());
|
||||
PortalRun(portal, count, false, true, dest, dest, NULL);
|
||||
|
|
|
@ -65,7 +65,7 @@
|
|||
#include "utils/datum.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/hsearch.h"
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "utils/inval.h"
|
||||
|
|
|
@ -223,7 +223,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
|||
/* allocate DDL statements, and then save position in DDL statements */
|
||||
List *tableDDLEventList = GetTableDDLEvents(relationId, includeSequenceDefaults);
|
||||
tableDDLEventCell = list_head(tableDDLEventList);
|
||||
ListCellAndListWrapper* wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
wrapper->list = tableDDLEventList;
|
||||
wrapper->listCell = tableDDLEventCell;
|
||||
functionContext->user_fctx = wrapper;
|
||||
|
@ -239,7 +239,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
functionContext = SRF_PERCALL_SETUP();
|
||||
|
||||
ListCellAndListWrapper* wrapper = (ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
ListCellAndListWrapper *wrapper =
|
||||
(ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
if (wrapper->listCell != NULL)
|
||||
{
|
||||
char *ddlStatement = (char *) lfirst(wrapper->listCell);
|
||||
|
|
|
@ -129,10 +129,10 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
|
|||
PlannedStmt *
|
||||
distributed_planner(Query *parse,
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
const char *query_string,
|
||||
const char *query_string,
|
||||
#endif
|
||||
int cursorOptions,
|
||||
ParamListInfo boundParams)
|
||||
int cursorOptions,
|
||||
ParamListInfo boundParams)
|
||||
{
|
||||
bool needsDistributedPlanning = false;
|
||||
bool fastPathRouterQuery = false;
|
||||
|
@ -223,9 +223,9 @@ distributed_planner(Query *parse,
|
|||
* postgres' planner.
|
||||
*/
|
||||
planContext.plan = standard_planner_compat(planContext.query,
|
||||
NULL,
|
||||
planContext.cursorOptions,
|
||||
planContext.boundParams);
|
||||
NULL,
|
||||
planContext.cursorOptions,
|
||||
planContext.boundParams);
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
result = PlanDistributedStmt(&planContext, rteIdCounter);
|
||||
|
|
|
@ -221,6 +221,7 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
|||
|
||||
bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION;
|
||||
|
||||
|
||||
if (es->analyze)
|
||||
{
|
||||
ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for INSERT "
|
||||
|
@ -313,7 +314,8 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
|||
|
||||
INSTR_TIME_SET_ZERO(planduration);
|
||||
|
||||
ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration, NULL);
|
||||
ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration,
|
||||
NULL);
|
||||
|
||||
if (es->format == EXPLAIN_FORMAT_TEXT)
|
||||
{
|
||||
|
@ -961,7 +963,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
|
|||
|
||||
INSTR_TIME_SET_CURRENT(planStart);
|
||||
|
||||
PlannedStmt *plan = pg_plan_query(query, 0, NULL);
|
||||
PlannedStmt *plan = pg_plan_query_compat(query, NULL, 0, NULL);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(planDuration);
|
||||
INSTR_TIME_SUBTRACT(planDuration, planStart);
|
||||
|
@ -1126,14 +1128,14 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
|
|||
INSTR_TIME_SET_CURRENT(planstart);
|
||||
|
||||
/* plan the query */
|
||||
PlannedStmt *plan = pg_plan_query(query, cursorOptions, params);
|
||||
PlannedStmt *plan = pg_plan_query_compat(query, NULL, cursorOptions, params);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(planduration);
|
||||
INSTR_TIME_SUBTRACT(planduration, planstart);
|
||||
|
||||
/* run it (if needed) and produce output */
|
||||
ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
|
||||
&planduration);
|
||||
ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv,
|
||||
&planduration, NULL);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -131,7 +131,8 @@ static List * QueryFromList(List *rangeTableList);
|
|||
static Node * QueryJoinTree(MultiNode *multiNode, List *dependentJobList,
|
||||
List **rangeTableList);
|
||||
static void SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry,
|
||||
List *l_colnames, List *r_colnames, List* leftColVars, List* rightColVars);
|
||||
List *l_colnames, List *r_colnames,
|
||||
List *leftColVars, List *rightColVars);
|
||||
static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList,
|
||||
List *rangeTableList);
|
||||
static int ExtractRangeTableId(Node *node);
|
||||
|
@ -1262,32 +1263,40 @@ JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTable
|
|||
rangeTableEntry->joinaliasvars = joinedColumnVars;
|
||||
|
||||
SetJoinRelatedColumnsCompat(rangeTableEntry,
|
||||
leftColumnNames, rightColumnNames, leftColumnVars, rightColumnVars);
|
||||
leftColumnNames, rightColumnNames, leftColumnVars,
|
||||
rightColumnVars);
|
||||
|
||||
return rangeTableEntry;
|
||||
}
|
||||
|
||||
static void SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry,
|
||||
List *leftColumnNames, List *rightColumnNames, List* leftColumnVars, List* rightColumnVars) {
|
||||
|
||||
static void
|
||||
SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry,
|
||||
List *leftColumnNames, List *rightColumnNames,
|
||||
List *leftColumnVars, List *rightColumnVars)
|
||||
{
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
|
||||
/* We don't have any merged columns so set it to 0 */
|
||||
rangeTableEntry->joinmergedcols = 0;
|
||||
Var* var = NULL;
|
||||
rangeTableEntry->joinmergedcols = 0;
|
||||
Var *var = NULL;
|
||||
int varId = 1;
|
||||
foreach_ptr(var, leftColumnVars) {
|
||||
foreach_ptr(var, leftColumnVars)
|
||||
{
|
||||
rangeTableEntry->joinleftcols = lappend_int(rangeTableEntry->joinleftcols, varId);
|
||||
varId++;
|
||||
}
|
||||
varId++;
|
||||
}
|
||||
varId = 1;
|
||||
foreach_ptr(var, rightColumnVars) {
|
||||
rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols, varId);
|
||||
foreach_ptr(var, rightColumnVars)
|
||||
{
|
||||
rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols,
|
||||
varId);
|
||||
varId++;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractRangeTableId gets the range table id from a node that could
|
||||
* either be a JoinExpr or RangeTblRef.
|
||||
|
|
|
@ -1358,6 +1358,7 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
|||
continue;
|
||||
}
|
||||
int rtoffset = RangeTableOffsetCompat(root, appendRelInfo);
|
||||
|
||||
/* set the varno accordingly for this specific child */
|
||||
varToBeAdded->varno = appendRelInfo->child_relid - rtoffset;
|
||||
|
||||
|
@ -1366,16 +1367,21 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo.
|
||||
* For PG < 13 this is a no op.
|
||||
*/
|
||||
static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo) {
|
||||
static int
|
||||
RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo)
|
||||
{
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
int i = 1;
|
||||
for (i = 1 ; i < root->simple_rel_array_size; i++) {
|
||||
RangeTblEntry* rte = root->simple_rte_array[i];
|
||||
if (rte->inh) {
|
||||
for (i = 1; i < root->simple_rel_array_size; i++)
|
||||
{
|
||||
RangeTblEntry *rte = root->simple_rte_array[i];
|
||||
if (rte->inh)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/tdigest_extension.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "parser/parse_func.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
|
|
@ -51,8 +51,8 @@ partition_task_list_results(PG_FUNCTION_ARGS)
|
|||
|
||||
Query *parsedQuery = ParseQueryString(queryString, NULL, 0);
|
||||
PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString,
|
||||
CURSOR_OPT_PARALLEL_OK,
|
||||
NULL);
|
||||
CURSOR_OPT_PARALLEL_OK,
|
||||
NULL);
|
||||
if (!IsCitusCustomScan(queryPlan->planTree))
|
||||
{
|
||||
ereport(ERROR, (errmsg("query must be distributed and shouldn't require "
|
||||
|
@ -124,8 +124,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
|
|||
|
||||
Query *parsedQuery = ParseQueryString(queryString, NULL, 0);
|
||||
PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString,
|
||||
CURSOR_OPT_PARALLEL_OK,
|
||||
NULL);
|
||||
CURSOR_OPT_PARALLEL_OK,
|
||||
NULL);
|
||||
if (!IsCitusCustomScan(queryPlan->planTree))
|
||||
{
|
||||
ereport(ERROR, (errmsg("query must be distributed and shouldn't require "
|
||||
|
|
|
@ -48,11 +48,11 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS)
|
|||
MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
||||
List *refList = list_copy(
|
||||
cacheEntry->referencingRelationsViaForeignKey);
|
||||
ListCellAndListWrapper* wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
foreignRelationCell = list_head(refList);
|
||||
wrapper->list = refList;
|
||||
wrapper->listCell = foreignRelationCell;
|
||||
functionContext->user_fctx = wrapper;
|
||||
functionContext->user_fctx = wrapper;
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,8 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
functionContext = SRF_PERCALL_SETUP();
|
||||
|
||||
ListCellAndListWrapper* wrapper = (ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
ListCellAndListWrapper *wrapper =
|
||||
(ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
if (wrapper->listCell != NULL)
|
||||
{
|
||||
Oid refId = lfirst_oid(wrapper->listCell);
|
||||
|
@ -106,7 +107,7 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS)
|
|||
MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
||||
List *refList = list_copy(cacheEntry->referencedRelationsViaForeignKey);
|
||||
foreignRelationCell = list_head(refList);
|
||||
ListCellAndListWrapper* wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
wrapper->list = refList;
|
||||
wrapper->listCell = foreignRelationCell;
|
||||
functionContext->user_fctx = wrapper;
|
||||
|
@ -121,7 +122,8 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
functionContext = SRF_PERCALL_SETUP();
|
||||
|
||||
ListCellAndListWrapper* wrapper = (ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
ListCellAndListWrapper *wrapper =
|
||||
(ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
|
||||
if (wrapper->listCell != NULL)
|
||||
{
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "utils/hsearch.h"
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "utils/lsyscache.h"
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
#include "storage/lockdefs.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/hsearch.h"
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "utils/memutils.h"
|
||||
|
|
|
@ -51,7 +51,7 @@
|
|||
#include "storage/lmgr.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "utils/memutils.h"
|
||||
|
|
|
@ -156,7 +156,8 @@ extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
|||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||
extern void EndRemoteCopy(int64 shardId, List *connectionList);
|
||||
extern List * CreateRangeTable(Relation rel, AclMode requiredAccess);
|
||||
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag,
|
||||
extern Node * ProcessCopyStmt(CopyStmt *copyStatement,
|
||||
QueryCompletionCompat *completionTag,
|
||||
const char *queryString);
|
||||
extern void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||
extern bool IsCopyResultStmt(CopyStmt *copyStatement);
|
||||
|
|
|
@ -54,11 +54,11 @@ extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
|
|||
ProcessUtilityContext context, ParamListInfo params,
|
||||
struct QueryEnvironment *queryEnv, DestReceiver *dest,
|
||||
QueryCompletionCompat *completionTag
|
||||
);
|
||||
);
|
||||
extern void CitusProcessUtility(Node *node, const char *queryString,
|
||||
ProcessUtilityContext context, ParamListInfo params,
|
||||
DestReceiver *dest,
|
||||
QueryCompletionCompat * completionTag
|
||||
QueryCompletionCompat *completionTag
|
||||
);
|
||||
extern void MarkInvalidateForeignKeyGraph(void);
|
||||
extern void InvalidateForeignKeyGraphForDDL(void);
|
||||
|
|
|
@ -183,13 +183,18 @@ typedef struct CitusCustomScanPath
|
|||
} CitusCustomScanPath;
|
||||
|
||||
|
||||
extern PlannedStmt * distributed_planner(
|
||||
Query *parse,
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
const char *query_string,
|
||||
#endif
|
||||
int cursorOptions,
|
||||
ParamListInfo boundParams);
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
extern PlannedStmt * distributed_planner(Query *parse,
|
||||
const char *query_string,
|
||||
int cursorOptions,
|
||||
ParamListInfo boundParams);
|
||||
#else
|
||||
extern PlannedStmt * distributed_planner(Query *parse,
|
||||
int cursorOptions,
|
||||
ParamListInfo boundParams);
|
||||
#endif
|
||||
|
||||
|
||||
extern List * ExtractRangeTableEntryList(Query *query);
|
||||
extern List * ExtractReferenceTableRTEList(List *rteList);
|
||||
extern bool NeedsDistributedPlanning(Query *query);
|
||||
|
|
|
@ -27,7 +27,8 @@
|
|||
* in separate function calls, we need both the list and the current cell.
|
||||
* Therefore this wrapper stores both of them.
|
||||
*/
|
||||
typedef struct ListCellAndListWrapper {
|
||||
typedef struct ListCellAndListWrapper
|
||||
{
|
||||
List *list;
|
||||
ListCell *listCell;
|
||||
} ListCellAndListWrapper;
|
||||
|
|
|
@ -30,30 +30,34 @@
|
|||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#define lnext_compat(l, r) lnext(l, r)
|
||||
#define list_delete_cell_compat(l,c,p) list_delete_cell(l,c)
|
||||
#define pg_plan_query_compat(p,q,c,b) pg_plan_query(p,q,c,b)
|
||||
#define planner_compat(p,q,c,b) planner(p,q,c,b)
|
||||
#define standard_planner_compat(a,b,c,d) standard_planner(a,b,c,d)
|
||||
#define PortalDefineQuerySelectCompat(a,b,c,e,f) PortalDefineQuery(a,b,c,CMDTAG_SELECT,e,f)
|
||||
#define getOwnedSequencesCompat(a,b) getOwnedSequences(a)
|
||||
#define ExplainOnePlanCompat(a,b,c,d,e,f,g,h) ExplainOnePlan(a,b,c,d,e,f,g,h)
|
||||
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c)
|
||||
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b)
|
||||
#define planner_compat(p, q, c, b) planner(p, q, c, b)
|
||||
#define standard_planner_compat(a, b, c, d) standard_planner(a, b, c, d)
|
||||
#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, \
|
||||
CMDTAG_SELECT, e, \
|
||||
f)
|
||||
#define getOwnedSequencesCompat(a, b) getOwnedSequences(a)
|
||||
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g, \
|
||||
h)
|
||||
#define varoattno varattnosyn
|
||||
#define varnoold varnosyn
|
||||
#define Set_ptr_value(a,b) a->ptr_value = b
|
||||
#define Set_ptr_value(a, b) a->ptr_value = b
|
||||
#define RangeTableEntryFromNSItem(a) a->p_rte
|
||||
#define QueryCompletionCompat QueryCompletion
|
||||
#else /* pre PG13 */
|
||||
#define lnext_compat(l, r) lnext(r)
|
||||
#define list_delete_cell_compat(l,c,p) list_delete_cell(l,c,p)
|
||||
#define pg_plan_query_compat(p,q,c,b) pg_plan_query(p,c,b)
|
||||
#define planner_compat(p,q,c,b) planner(p,c,b)
|
||||
#define standard_planner_compat(a,b,c,d) standard_planner(a,c,d)
|
||||
#define PortalDefineQuerySelectCompat(a,b,c,e,f) PortalDefineQuery(a,b,c,"SELECT",e,f)
|
||||
#define getOwnedSequencesCompat(a,b) getOwnedSequences(a,b)
|
||||
#define ExplainOnePlanCompat(a,b,c,d,e,f,g,h) ExplainOnePlan(a,b,c,d,e,f,g)
|
||||
#define Set_ptr_value(a,b) a->data.ptr_value = b
|
||||
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p)
|
||||
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b)
|
||||
#define planner_compat(p, q, c, b) planner(p, c, b)
|
||||
#define standard_planner_compat(a, b, c, d) standard_planner(a, c, d)
|
||||
#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, "SELECT", \
|
||||
e, f)
|
||||
#define getOwnedSequencesCompat(a, b) getOwnedSequences(a, b)
|
||||
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g)
|
||||
#define Set_ptr_value(a, b) a->data.ptr_value = b
|
||||
#define RangeTableEntryFromNSItem(a) a
|
||||
#define QueryCompletionCompat char
|
||||
#define QueryCompletionCompat char
|
||||
#endif
|
||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||
|
||||
|
|
Loading…
Reference in New Issue