mirror of https://github.com/citusdata/citus.git
PG17 compatibility: Resolve compilation issues (#7699)
This PR provides successful compilation against PG17.0. - Remove ExecFreeExprContext call Relevant PG commit d060e921ea5aa47b6265174c32e1128cebdbc3dfpull/7746/headd060e921ea
- PG17 uses streaming IO in analyze, fix scan_analyze_next_block function Relevant PG commit 041b96802efa33d2bc9456f2ad946976b92b5ae1041b96802e
- Define ObjectClass for PG17+ only since it's removed Relevant PG commit: 89e5ef7e21812916c9cf9fcf56e45f0f7403465689e5ef7e21
- Remove ReorderBufferTupleBuf structure. Relevant PG commit: 08e6344fd6423210b339e92c069bb979ba4e7cd608e6344fd6
- Define colliculocale and daticulocale since they have been renamed Relevant PG commit: f696c0cd5f299f1b51e214efc55a22a782cc175df696c0cd5f
- makeStringConst defined in PG17 Relevant PG commit: de3600452b61d1bc3967e9e37e86db8956c8f577de3600452b
- RangeVarCallbackOwnsTable was replaced by RangeVarCallbackMaintainsTable Relevant PG commit: ecb0fd33720fab91df1207e85704f382f55e1eb7ecb0fd3372
- attstattarget is nullable, define pg compatible functions for it Relevant PG commit: 4f622503d6de975ac87448aea5cea7de4bc140d54f622503d6
- stxstattarget is nullable in PG17, write compat functions for it Relevant PG commit: 012460ee93c304fbc7220e5b55d9d0577fc766ab012460ee93
- Use ResourceOwner to track WaitEventSet in PG17 Relevant PG commit: 50c67c2019ab9ade8aa8768bfe604cd802fe859150c67c2019
- getIdentitySequence now uses Relation instead of relation_id Relevant PG commit: 509199587df73f06eda898ae13284292f4ae573a509199587d
- Remove no-op tuplestore_donestoring function Relevant PG commit: 75680c3d805e2323cd437ac567f0677fdfc7b68075680c3d80
- MergeAction can have 3 merge kinds (now enum) in PG17, write compat Relevant PG commit: 0294df2f1f842dfb0eed79007b21016f486a3c6c0294df2f1f
- EXPLAIN (MEMORY) is added, make changes to ExplainOnePlan Relevant PG commit: 5de890e3610d5a12cdaea36413d967cf5c544e205de890e361
- LIMIT_OPTION_DEFAULT has been removed as it's useless, use LIMIT_OPTION_COUNT Relevant PG commit: a6be0600ac3b71dda8277ab0fcbe59ee101ac1cea6be0600ac
- write compat for create_foreignscan_path bcs of more arguments in PG17 Relevant PG commit: 9e9931d2bf40e2fea447d779c2e133c2c1256ef39e9931d2bf
- pgprocno and lxid have been combined into a struct in PGPROC Relevant PG commits: 28f3915b73f75bd1b50ba070f56b34241fe53fd128f3915b73
ab355e3a88de745607f6dd4c21f0119b5c68f2adab355e3a88
024c521117579a6d356050ad3d78fdc95e44eefa024c521117
- Simplify CitusNewNode (#7434) postgres refactored newNode() in PG 17, the main point for doing this is the original tricks is no longer neccessary for modern compilers[1]. This does the same for Citus. This should have no backward compatibility issues since it just replaces palloc0fast with palloc0. This is good for forward compatibility since palloc0fast no longer exists in PG 17. [1] https://www.postgresql.org/message-id/b51f1fa7-7e6a-4ecc-936d-90a8a1659e7c@iki.fi (cherry picked from commit4b295cc
)
parent
9d364332ac
commit
da2624cee8
|
@ -1924,11 +1924,6 @@ ColumnarScan_EndCustomScan(CustomScanState *node)
|
|||
*/
|
||||
TableScanDesc scanDesc = node->ss.ss_currentScanDesc;
|
||||
|
||||
/*
|
||||
* Free the exprcontext
|
||||
*/
|
||||
ExecFreeExprContext(&node->ss.ps);
|
||||
|
||||
/*
|
||||
* clean out the tuple table
|
||||
*/
|
||||
|
|
|
@ -1424,8 +1424,13 @@ ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode, int timeout,
|
|||
|
||||
|
||||
static bool
|
||||
columnar_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
||||
columnar_scan_analyze_next_block(TableScanDesc scan,
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
ReadStream *stream)
|
||||
#else
|
||||
BlockNumber blockno,
|
||||
BufferAccessStrategy bstrategy)
|
||||
#endif
|
||||
{
|
||||
/*
|
||||
* Our access method is not pages based, i.e. tuples are not confined
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
#include "utils/rel.h"
|
||||
#include "utils/typcache.h"
|
||||
|
||||
#include "pg_version_constants.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
||||
|
@ -435,6 +437,74 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation
|
|||
return;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
|
||||
/* Check the ReorderBufferChange's action type and handle them accordingly.*/
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
{
|
||||
/* For insert action, only new tuple should always be translated*/
|
||||
HeapTuple sourceRelationNewTuple = change->data.tp.newtuple;
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
change->data.tp.newtuple = targetRelationNewTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* For update changes both old and new tuples need to be translated for target relation
|
||||
* if the REPLICA IDENTITY is set to FULL. Otherwise, only the new tuple needs to be
|
||||
* translated for target relation.
|
||||
*/
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
{
|
||||
/* For update action, new tuple should always be translated*/
|
||||
/* Get the new tuple from the ReorderBufferChange, and translate it to target relation. */
|
||||
HeapTuple sourceRelationNewTuple = change->data.tp.newtuple;
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
change->data.tp.newtuple = targetRelationNewTuple;
|
||||
|
||||
/*
|
||||
* Format oldtuple according to the target relation. If the column values of replica
|
||||
* identiy change, then the old tuple is non-null and needs to be formatted according
|
||||
* to the target relation schema.
|
||||
*/
|
||||
if (change->data.tp.oldtuple != NULL)
|
||||
{
|
||||
HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple;
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationOldTuple,
|
||||
sourceRelationDesc,
|
||||
targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple = targetRelationOldTuple;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
{
|
||||
/* For delete action, only old tuple should be translated*/
|
||||
HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple;
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationOldTuple,
|
||||
sourceRelationDesc,
|
||||
targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple = targetRelationOldTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/* Do nothing for other action types. */
|
||||
break;
|
||||
}
|
||||
}
|
||||
#else
|
||||
|
||||
/* Check the ReorderBufferChange's action type and handle them accordingly.*/
|
||||
switch (change->action)
|
||||
{
|
||||
|
@ -499,4 +569,5 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation
|
|||
break;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
|
|||
* ICU-related field. Only the libc-related fields or the ICU-related field
|
||||
* is set, never both.
|
||||
*/
|
||||
char *colliculocale;
|
||||
char *colllocale;
|
||||
bool isnull;
|
||||
|
||||
Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate,
|
||||
|
@ -101,17 +101,17 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
|
|||
collctype = NULL;
|
||||
}
|
||||
|
||||
datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colliculocale, &isnull);
|
||||
datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colllocale, &isnull);
|
||||
if (!isnull)
|
||||
{
|
||||
colliculocale = TextDatumGetCString(datum);
|
||||
colllocale = TextDatumGetCString(datum);
|
||||
}
|
||||
else
|
||||
{
|
||||
colliculocale = NULL;
|
||||
colllocale = NULL;
|
||||
}
|
||||
|
||||
Assert((collcollate && collctype) || colliculocale);
|
||||
Assert((collcollate && collctype) || colllocale);
|
||||
#else
|
||||
|
||||
/*
|
||||
|
@ -147,12 +147,12 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
|
|||
*quotedCollationName, providerString);
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
if (colliculocale)
|
||||
if (colllocale)
|
||||
{
|
||||
appendStringInfo(&collationNameDef,
|
||||
", locale = %s",
|
||||
quote_literal_cstr(colliculocale));
|
||||
pfree(colliculocale);
|
||||
quote_literal_cstr(colllocale));
|
||||
pfree(colllocale);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -71,7 +71,9 @@ static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple,
|
|||
TupleDesc DbRoleSettingDescription);
|
||||
static char * GetDatabaseNameFromDbRoleSetting(HeapTuple tuple,
|
||||
TupleDesc DbRoleSettingDescription);
|
||||
#if PG_VERSION_NUM < PG_VERSION_17
|
||||
static Node * makeStringConst(char *str, int location);
|
||||
#endif
|
||||
static Node * makeIntConst(int val, int location);
|
||||
static Node * makeFloatConst(char *str, int location);
|
||||
static const char * WrapQueryInAlterRoleIfExistsCall(const char *query, RoleSpec *role);
|
||||
|
@ -949,6 +951,8 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString,
|
|||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM < PG_VERSION_17
|
||||
|
||||
/*
|
||||
* makeStringConst creates a Const Node that stores a given string
|
||||
*
|
||||
|
@ -972,6 +976,9 @@ makeStringConst(char *str, int location)
|
|||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* makeIntConst creates a Const Node that stores a given integer
|
||||
*
|
||||
|
|
|
@ -651,14 +651,15 @@ GetAlterIndexStatisticsCommands(Oid indexOid)
|
|||
}
|
||||
|
||||
Form_pg_attribute targetAttr = (Form_pg_attribute) GETSTRUCT(attTuple);
|
||||
if (targetAttr->attstattarget != DEFAULT_STATISTICS_TARGET)
|
||||
int32 targetAttstattarget = getAttstattarget_compat(attTuple);
|
||||
if (targetAttstattarget != DEFAULT_STATISTICS_TARGET)
|
||||
{
|
||||
char *indexNameWithSchema = generate_qualified_relation_name(indexOid);
|
||||
|
||||
char *command =
|
||||
GenerateAlterIndexColumnSetStatsCommand(indexNameWithSchema,
|
||||
targetAttr->attnum,
|
||||
targetAttr->attstattarget);
|
||||
targetAttstattarget);
|
||||
|
||||
alterIndexStatisticsCommandList =
|
||||
lappend(alterIndexStatisticsCommandList,
|
||||
|
@ -773,9 +774,10 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid)
|
|||
}
|
||||
|
||||
Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup);
|
||||
int16 currentStxstattarget = getStxstattarget_compat(tup);
|
||||
ReleaseSysCache(tup);
|
||||
|
||||
if (statisticsForm->stxstattarget == -1)
|
||||
if (currentStxstattarget == -1)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
@ -785,7 +787,8 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid)
|
|||
char *schemaName = get_namespace_name(statisticsForm->stxnamespace);
|
||||
char *statName = NameStr(statisticsForm->stxname);
|
||||
|
||||
alterStatsStmt->stxstattarget = statisticsForm->stxstattarget;
|
||||
alterStatsStmt->stxstattarget = getAlterStatsStxstattarget_compat(
|
||||
currentStxstattarget);
|
||||
alterStatsStmt->defnames = list_make2(makeString(schemaName), makeString(statName));
|
||||
|
||||
return DeparseAlterStatisticsStmt((Node *) alterStatsStmt);
|
||||
|
|
|
@ -866,7 +866,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
|
|||
*waitCount = 0;
|
||||
}
|
||||
|
||||
WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
|
||||
WaitEventSet *waitEventSet = CreateWaitEventSet(WaitEventSetTracker_compat,
|
||||
eventSetSize);
|
||||
EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet),
|
||||
waitEventSet);
|
||||
|
||||
|
|
|
@ -1130,7 +1130,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
|||
|
||||
/* allocate pending connections + 2 for the signal latch and postmaster death */
|
||||
/* (CreateWaitEventSet makes room for pgwin32_signal_event automatically) */
|
||||
WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext,
|
||||
WaitEventSet *waitEventSet = CreateWaitEventSet(WaitEventSetTracker_compat,
|
||||
pendingConnectionCount + 2);
|
||||
|
||||
for (int connectionIndex = 0; connectionIndex < pendingConnectionCount;
|
||||
|
|
|
@ -395,7 +395,8 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
|
|||
if (attributeForm->attidentity && includeIdentityDefaults)
|
||||
{
|
||||
bool missing_ok = false;
|
||||
Oid seqOid = getIdentitySequence(RelationGetRelid(relation),
|
||||
Oid seqOid = getIdentitySequence(identitySequenceRelation_compat(
|
||||
relation),
|
||||
attributeForm->attnum, missing_ok);
|
||||
|
||||
if (includeIdentityDefaults == INCLUDE_IDENTITY)
|
||||
|
@ -738,7 +739,18 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
|
|||
* If the user changed the column's statistics target, create
|
||||
* alter statement and add statement to a list for later processing.
|
||||
*/
|
||||
if (attributeForm->attstattarget >= 0)
|
||||
HeapTuple atttuple = SearchSysCache2(ATTNUM,
|
||||
ObjectIdGetDatum(tableRelationId),
|
||||
Int16GetDatum(attributeForm->attnum));
|
||||
if (!HeapTupleIsValid(atttuple))
|
||||
{
|
||||
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
|
||||
attributeForm->attnum, tableRelationId);
|
||||
}
|
||||
|
||||
int32 targetAttstattarget = getAttstattarget_compat(atttuple);
|
||||
ReleaseSysCache(atttuple);
|
||||
if (targetAttstattarget >= 0)
|
||||
{
|
||||
StringInfoData statement = { NULL, 0, 0, 0 };
|
||||
initStringInfo(&statement);
|
||||
|
@ -746,7 +758,7 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
|
|||
appendStringInfo(&statement, "ALTER COLUMN %s ",
|
||||
quote_identifier(attributeName));
|
||||
appendStringInfo(&statement, "SET STATISTICS %d",
|
||||
attributeForm->attstattarget);
|
||||
targetAttstattarget);
|
||||
|
||||
columnOptionList = lappend(columnOptionList, statement.data);
|
||||
}
|
||||
|
|
|
@ -177,8 +177,9 @@ AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
|
|||
static void
|
||||
AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt)
|
||||
{
|
||||
appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", NameListToQuotedString(
|
||||
stmt->defnames), stmt->stxstattarget);
|
||||
appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d",
|
||||
NameListToQuotedString(stmt->defnames),
|
||||
getIntStxstattarget_compat(stmt->stxstattarget));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -4740,7 +4740,7 @@ BuildWaitEventSet(List *sessionList)
|
|||
int eventSetSize = GetEventSetSize(sessionList);
|
||||
|
||||
WaitEventSet *waitEventSet =
|
||||
CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
|
||||
CreateWaitEventSet(WaitEventSetTracker_compat, eventSetSize);
|
||||
|
||||
WorkerSession *session = NULL;
|
||||
foreach_declared_ptr(session, sessionList)
|
||||
|
|
|
@ -759,9 +759,6 @@ citus_query_stats(PG_FUNCTION_ARGS)
|
|||
|
||||
LWLockRelease(queryStats->lock);
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupstore);
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1475,7 +1475,7 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
|
|||
foreach_declared_ptr(action, query->mergeActionList)
|
||||
{
|
||||
/* Skip MATCHED clause as INSERTS are not allowed in it */
|
||||
if (action->matched)
|
||||
if (matched_compat(action))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -375,6 +375,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
|||
BufferUsage bufusage_start,
|
||||
bufusage;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
MemoryContextCounters mem_counters;
|
||||
MemoryContext planner_ctx = NULL;
|
||||
MemoryContext saved_ctx = NULL;
|
||||
|
||||
if (es->memory)
|
||||
{
|
||||
/* copy paste from postgres code */
|
||||
planner_ctx = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"explain analyze planner context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
saved_ctx = MemoryContextSwitchTo(planner_ctx);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (es->buffers)
|
||||
{
|
||||
bufusage_start = pgBufferUsage;
|
||||
|
@ -432,8 +447,20 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
|||
|
||||
ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es);
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
if (es->memory)
|
||||
{
|
||||
MemoryContextSwitchTo(saved_ctx);
|
||||
MemoryContextMemConsumed(planner_ctx, &mem_counters);
|
||||
}
|
||||
|
||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration,
|
||||
(es->buffers ? &bufusage : NULL),
|
||||
(es->memory ? &mem_counters : NULL));
|
||||
#else
|
||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration,
|
||||
(es->buffers ? &bufusage : NULL));
|
||||
#endif
|
||||
|
||||
ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es);
|
||||
ExplainCloseGroup("Subplan", NULL, true, es);
|
||||
|
@ -1253,6 +1280,21 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
|
|||
BufferUsage bufusage_start,
|
||||
bufusage;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
MemoryContextCounters mem_counters;
|
||||
MemoryContext planner_ctx = NULL;
|
||||
MemoryContext saved_ctx = NULL;
|
||||
|
||||
if (es->memory)
|
||||
{
|
||||
/* copy paste from postgres code */
|
||||
planner_ctx = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"explain analyze planner context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
saved_ctx = MemoryContextSwitchTo(planner_ctx);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (es->buffers)
|
||||
{
|
||||
bufusage_start = pgBufferUsage;
|
||||
|
@ -1286,9 +1328,23 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
|
|||
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
if (es->memory)
|
||||
{
|
||||
MemoryContextSwitchTo(saved_ctx);
|
||||
MemoryContextMemConsumed(planner_ctx, &mem_counters);
|
||||
}
|
||||
|
||||
/* run it (if needed) and produce output */
|
||||
ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
|
||||
&planduration, (es->buffers ? &bufusage : NULL),
|
||||
(es->memory ? &mem_counters : NULL));
|
||||
#else
|
||||
|
||||
/* run it (if needed) and produce output */
|
||||
ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
|
||||
&planduration, (es->buffers ? &bufusage : NULL));
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -1701,6 +1757,21 @@ ExplainOneQuery(Query *query, int cursorOptions,
|
|||
BufferUsage bufusage_start,
|
||||
bufusage;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
MemoryContextCounters mem_counters;
|
||||
MemoryContext planner_ctx = NULL;
|
||||
MemoryContext saved_ctx = NULL;
|
||||
|
||||
if (es->memory)
|
||||
{
|
||||
/* copy paste from postgres code */
|
||||
planner_ctx = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"explain analyze planner context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
saved_ctx = MemoryContextSwitchTo(planner_ctx);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (es->buffers)
|
||||
bufusage_start = pgBufferUsage;
|
||||
INSTR_TIME_SET_CURRENT(planstart);
|
||||
|
@ -1718,9 +1789,21 @@ ExplainOneQuery(Query *query, int cursorOptions,
|
|||
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
if (es->memory)
|
||||
{
|
||||
MemoryContextSwitchTo(saved_ctx);
|
||||
MemoryContextMemConsumed(planner_ctx, &mem_counters);
|
||||
}
|
||||
/* run it (if needed) and produce output */
|
||||
ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
|
||||
&planduration, (es->buffers ? &bufusage : NULL),
|
||||
(es->memory ? &mem_counters : NULL));
|
||||
#else
|
||||
/* run it (if needed) and produce output */
|
||||
ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
|
||||
&planduration, (es->buffers ? &bufusage : NULL));
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -547,7 +547,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
|
|||
List *sortClauseList = NIL;
|
||||
Node *limitCount = NULL;
|
||||
Node *limitOffset = NULL;
|
||||
LimitOption limitOption = LIMIT_OPTION_DEFAULT;
|
||||
LimitOption limitOption = LIMIT_OPTION_COUNT;
|
||||
Node *havingQual = NULL;
|
||||
bool hasDistinctOn = false;
|
||||
List *distinctClause = NIL;
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
#include "utils/lsyscache.h"
|
||||
#include "utils/typcache.h"
|
||||
|
||||
#include "pg_version_constants.h"
|
||||
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
|
@ -134,6 +136,43 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
}
|
||||
|
||||
Oid targetRelationOid = InvalidOid;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
{
|
||||
HeapTuple newTuple = change->data.tp.newtuple;
|
||||
targetRelationOid = FindTargetRelationOid(relation, newTuple,
|
||||
replicationSlotName);
|
||||
break;
|
||||
}
|
||||
|
||||
/* updating non-partition column value */
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
{
|
||||
HeapTuple newTuple = change->data.tp.newtuple;
|
||||
targetRelationOid = FindTargetRelationOid(relation, newTuple,
|
||||
replicationSlotName);
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
{
|
||||
HeapTuple oldTuple = change->data.tp.oldtuple;
|
||||
targetRelationOid = FindTargetRelationOid(relation, oldTuple,
|
||||
replicationSlotName);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
|
||||
default:
|
||||
ereport(ERROR, errmsg(
|
||||
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
|
||||
change->action));
|
||||
}
|
||||
#else
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
|
@ -168,6 +207,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
|
||||
change->action));
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Current replication slot is not responsible for handling the change */
|
||||
if (targetRelationOid == InvalidOid)
|
||||
|
@ -185,6 +225,62 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
|
||||
if (sourceRelationDesc->natts > targetRelationDesc->natts)
|
||||
{
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
{
|
||||
HeapTuple sourceRelationNewTuple = change->data.tp.newtuple;
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
|
||||
change->data.tp.newtuple = targetRelationNewTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
{
|
||||
HeapTuple sourceRelationNewTuple = change->data.tp.newtuple;
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
|
||||
change->data.tp.newtuple = targetRelationNewTuple;
|
||||
|
||||
/*
|
||||
* Format oldtuple according to the target relation. If the column values of replica
|
||||
* identiy change, then the old tuple is non-null and needs to be formatted according
|
||||
* to the target relation schema.
|
||||
*/
|
||||
if (change->data.tp.oldtuple != NULL)
|
||||
{
|
||||
HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple;
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
|
||||
sourceRelationOldTuple,
|
||||
sourceRelationDesc,
|
||||
targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple = targetRelationOldTuple;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
{
|
||||
HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple;
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
|
||||
sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple = targetRelationOldTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
|
||||
default:
|
||||
ereport(ERROR, errmsg(
|
||||
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
|
||||
change->action));
|
||||
}
|
||||
#else
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
|
@ -239,6 +335,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
|
||||
change->action));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
pgOutputPluginChangeCB(ctx, txn, targetRelation, change);
|
||||
|
|
|
@ -372,8 +372,13 @@ fake_vacuum(Relation onerel, VacuumParams *params,
|
|||
|
||||
|
||||
static bool
|
||||
fake_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
||||
fake_scan_analyze_next_block(TableScanDesc scan,
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
ReadStream *stream)
|
||||
#else
|
||||
BlockNumber blockno,
|
||||
BufferAccessStrategy bstrategy)
|
||||
#endif
|
||||
{
|
||||
/* we don't support analyze, so return false */
|
||||
return false;
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "optimizer/restrictinfo.h"
|
||||
#include "utils/palloc.h"
|
||||
|
||||
#include "pg_version_constants.h"
|
||||
#include "pg_version_compat.h"
|
||||
|
||||
/* local function forward declarations */
|
||||
static void FakeGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
|
||||
|
@ -91,9 +91,11 @@ FakeGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
|
|||
Cost startup_cost = 0;
|
||||
Cost total_cost = startup_cost + baserel->rows;
|
||||
|
||||
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, NULL, baserel->rows,
|
||||
startup_cost, total_cost, NIL,
|
||||
NULL, NULL, NIL));
|
||||
add_path(baserel, (Path *) create_foreignscan_path_compat(root, baserel, NULL,
|
||||
baserel->rows,
|
||||
startup_cost, total_cost,
|
||||
NIL,
|
||||
NULL, NULL, NIL, NIL));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
#include "storage/spin.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
#include "pg_version_constants.h"
|
||||
#include "pg_version_compat.h"
|
||||
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/connection_management.h"
|
||||
|
@ -700,7 +700,7 @@ InitializeBackendData(const char *applicationName)
|
|||
|
||||
uint64 gpid = ExtractGlobalPID(applicationName);
|
||||
|
||||
MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno];
|
||||
MyBackendData = &backendManagementShmemData->backends[getProcNo_compat(MyProc)];
|
||||
|
||||
Assert(MyBackendData);
|
||||
|
||||
|
@ -1174,11 +1174,11 @@ CurrentDistributedTransactionNumber(void)
|
|||
void
|
||||
GetBackendDataForProc(PGPROC *proc, BackendData *result)
|
||||
{
|
||||
int pgprocno = proc->pgprocno;
|
||||
int pgprocno = getProcNo_compat(proc);
|
||||
|
||||
if (proc->lockGroupLeader != NULL)
|
||||
{
|
||||
pgprocno = proc->lockGroupLeader->pgprocno;
|
||||
pgprocno = getProcNo_compat(proc->lockGroupLeader);
|
||||
}
|
||||
|
||||
BackendData *backendData = &backendManagementShmemData->backends[pgprocno];
|
||||
|
@ -1198,7 +1198,8 @@ GetBackendDataForProc(PGPROC *proc, BackendData *result)
|
|||
void
|
||||
CancelTransactionDueToDeadlock(PGPROC *proc)
|
||||
{
|
||||
BackendData *backendData = &backendManagementShmemData->backends[proc->pgprocno];
|
||||
BackendData *backendData = &backendManagementShmemData->backends[getProcNo_compat(
|
||||
proc)];
|
||||
|
||||
/* backend might not have used citus yet and thus not initialized backend data */
|
||||
if (!backendData)
|
||||
|
@ -1330,7 +1331,7 @@ ActiveDistributedTransactionNumbers(void)
|
|||
LocalTransactionId
|
||||
GetMyProcLocalTransactionId(void)
|
||||
{
|
||||
return MyProc->lxid;
|
||||
return getLxid_compat(MyProc);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
#include "utils/hsearch.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
#include "pg_version_compat.h"
|
||||
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
|
@ -993,7 +995,7 @@ AllocWaitEdge(WaitGraph *waitGraph)
|
|||
static void
|
||||
AddProcToVisit(PROCStack *remaining, PGPROC *proc)
|
||||
{
|
||||
if (remaining->procAdded[proc->pgprocno])
|
||||
if (remaining->procAdded[getProcNo_compat(proc)])
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -1001,7 +1003,7 @@ AddProcToVisit(PROCStack *remaining, PGPROC *proc)
|
|||
Assert(remaining->procCount < TotalProcCount());
|
||||
|
||||
remaining->procs[remaining->procCount++] = proc;
|
||||
remaining->procAdded[proc->pgprocno] = true;
|
||||
remaining->procAdded[getProcNo_compat(proc)] = true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -53,9 +53,6 @@ static const char *CitusNodeTagNamesD[] = {
|
|||
|
||||
const char **CitusNodeTagNames = CitusNodeTagNamesD;
|
||||
|
||||
/* support for CitusNewNode() macro */
|
||||
CitusNode *newCitusNodeMacroHolder;
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(citus_extradata_container);
|
||||
|
||||
|
|
|
@ -170,7 +170,8 @@ worker_adjust_identity_column_seq_ranges(PG_FUNCTION_ARGS)
|
|||
|
||||
if (attributeForm->attidentity)
|
||||
{
|
||||
Oid sequenceOid = getIdentitySequence(tableRelationId,
|
||||
Oid sequenceOid = getIdentitySequence(identitySequenceRelation_compat(
|
||||
tableRelation),
|
||||
attributeForm->attnum,
|
||||
missingSequenceOk);
|
||||
|
||||
|
|
|
@ -92,38 +92,21 @@ CitusNodeTagI(Node *node)
|
|||
return ((CitusNode*)(node))->citus_tag;
|
||||
}
|
||||
|
||||
/*
|
||||
* Postgres's nodes/nodes.h has more information on why we do this.
|
||||
*/
|
||||
#ifdef __GNUC__
|
||||
|
||||
/* Citus variant of newNode(), don't use directly. */
|
||||
#define CitusNewNode(size, tag) \
|
||||
({ CitusNode *_result; \
|
||||
AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \
|
||||
_result = (CitusNode *) palloc0fast(size); \
|
||||
_result->extensible.type = T_ExtensibleNode; \
|
||||
_result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \
|
||||
_result->citus_tag =(int) (tag); \
|
||||
_result; \
|
||||
})
|
||||
static inline CitusNode *
|
||||
CitusNewNode(size_t size, CitusNodeTag tag)
|
||||
{
|
||||
CitusNode *result;
|
||||
|
||||
#else
|
||||
|
||||
extern CitusNode *newCitusNodeMacroHolder;
|
||||
|
||||
#define CitusNewNode(size, tag) \
|
||||
( \
|
||||
AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \
|
||||
newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \
|
||||
newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \
|
||||
newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \
|
||||
newCitusNodeMacroHolder->citus_tag =(int) (tag), \
|
||||
newCitusNodeMacroHolder \
|
||||
)
|
||||
|
||||
#endif
|
||||
Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */
|
||||
result = (CitusNode *) palloc0(size);
|
||||
result->extensible.type = T_ExtensibleNode;
|
||||
result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START];
|
||||
result->citus_tag = (int) (tag);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* IsA equivalent that compares node tags, including Citus-specific nodes.
|
||||
|
|
|
@ -13,6 +13,139 @@
|
|||
|
||||
#include "pg_version_constants.h"
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
|
||||
#include "catalog/pg_am.h"
|
||||
#include "catalog/pg_auth_members.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "catalog/pg_collation.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "catalog/pg_database.h"
|
||||
#include "catalog/pg_extension.h"
|
||||
#include "catalog/pg_foreign_server.h"
|
||||
#include "catalog/pg_namespace.h"
|
||||
#include "catalog/pg_parameter_acl.h"
|
||||
#include "catalog/pg_proc.h"
|
||||
#include "catalog/pg_publication.h"
|
||||
#include "catalog/pg_tablespace.h"
|
||||
#include "catalog/pg_transform.h"
|
||||
#include "catalog/pg_ts_config.h"
|
||||
#include "catalog/pg_ts_dict.h"
|
||||
#include "catalog/pg_ts_template.h"
|
||||
#include "catalog/pg_type.h"
|
||||
|
||||
typedef int ObjectClass;
|
||||
#define getObjectClass(a) a->classId
|
||||
#define LAST_OCLASS TransformRelationId
|
||||
#define OCLASS_ROLE AuthIdRelationId
|
||||
#define OCLASS_DATABASE DatabaseRelationId
|
||||
#define OCLASS_TBLSPACE TableSpaceRelationId
|
||||
#define OCLASS_PARAMETER_ACL ParameterAclRelationId
|
||||
#define OCLASS_ROLE_MEMBERSHIP AuthMemRelationId
|
||||
#define OCLASS_CLASS RelationRelationId
|
||||
#define OCLASS_COLLATION CollationRelationId
|
||||
#define OCLASS_CONSTRAINT ConstraintRelationId
|
||||
#define OCLASS_PROC ProcedureRelationId
|
||||
#define OCLASS_PUBLICATION PublicationRelationId
|
||||
#define OCLASS_SCHEMA NamespaceRelationId
|
||||
#define OCLASS_TSCONFIG TSConfigRelationId
|
||||
#define OCLASS_TSDICT TSDictionaryRelationId
|
||||
#define OCLASS_TYPE TypeRelationId
|
||||
#define OCLASS_EXTENSION ExtensionRelationId
|
||||
#define OCLASS_FOREIGN_SERVER ForeignServerRelationId
|
||||
#define OCLASS_AM AccessMethodRelationId
|
||||
#define OCLASS_TSTEMPLATE TSTemplateRelationId
|
||||
|
||||
#include "commands/tablecmds.h"
|
||||
|
||||
static inline void
|
||||
RangeVarCallbackOwnsTable(const RangeVar *relation,
|
||||
Oid relId, Oid oldRelId, void *arg)
|
||||
{
|
||||
return RangeVarCallbackMaintainsTable(relation, relId, oldRelId, arg);
|
||||
}
|
||||
|
||||
|
||||
#include "catalog/pg_attribute.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
static inline int
|
||||
getAttstattarget_compat(HeapTuple attTuple)
|
||||
{
|
||||
bool isnull;
|
||||
Datum dat = SysCacheGetAttr(ATTNUM, attTuple,
|
||||
Anum_pg_attribute_attstattarget, &isnull);
|
||||
return (isnull ? -1 : DatumGetInt16(dat));
|
||||
}
|
||||
|
||||
|
||||
#include "catalog/pg_statistic_ext.h"
|
||||
|
||||
static inline int
|
||||
getStxstattarget_compat(HeapTuple tup)
|
||||
{
|
||||
bool isnull;
|
||||
Datum dat = SysCacheGetAttr(STATEXTOID, tup,
|
||||
Anum_pg_statistic_ext_stxstattarget, &isnull);
|
||||
return (isnull ? -1 : DatumGetInt16(dat));
|
||||
}
|
||||
|
||||
|
||||
#define getAlterStatsStxstattarget_compat(a) ((Node *) makeInteger(a))
|
||||
#define getIntStxstattarget_compat(a) (intVal(a))
|
||||
|
||||
#define WaitEventSetTracker_compat CurrentResourceOwner
|
||||
|
||||
#define identitySequenceRelation_compat(a) (a)
|
||||
|
||||
#define matched_compat(a) (a->matchKind == MERGE_WHEN_MATCHED)
|
||||
|
||||
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, \
|
||||
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
|
||||
i, j, k)
|
||||
|
||||
#define getProcNo_compat(a) (a->vxid.procNumber)
|
||||
#define getLxid_compat(a) (a->vxid.lxid)
|
||||
|
||||
#else
|
||||
|
||||
#define Anum_pg_collation_colllocale Anum_pg_collation_colliculocale
|
||||
|
||||
#include "access/htup_details.h"
|
||||
static inline int
|
||||
getAttstattarget_compat(HeapTuple attTuple)
|
||||
{
|
||||
return ((Form_pg_attribute) GETSTRUCT(attTuple))->attstattarget;
|
||||
}
|
||||
|
||||
|
||||
#include "catalog/pg_statistic_ext.h"
|
||||
static inline int
|
||||
getStxstattarget_compat(HeapTuple tup)
|
||||
{
|
||||
return ((Form_pg_statistic_ext) GETSTRUCT(tup))->stxstattarget;
|
||||
}
|
||||
|
||||
|
||||
#define getAlterStatsStxstattarget_compat(a) (a)
|
||||
#define getIntStxstattarget_compat(a) (a)
|
||||
|
||||
#define WaitEventSetTracker_compat CurrentMemoryContext
|
||||
|
||||
#define identitySequenceRelation_compat(a) (RelationGetRelid(a))
|
||||
|
||||
#define matched_compat(a) (a->matched)
|
||||
|
||||
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, \
|
||||
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
|
||||
i, k)
|
||||
|
||||
#define getProcNo_compat(a) (a->pgprocno)
|
||||
#define getLxid_compat(a) (a->lxid)
|
||||
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||
|
||||
#include "utils/guc_tables.h"
|
||||
|
|
Loading…
Reference in New Issue