From da2624cee82acd1ab53435b215fbb67c190a9e96 Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:37:13 +0300 Subject: [PATCH] PG17 compatibility: Resolve compilation issues (#7699) This PR provides successful compilation against PG17.0. - Remove ExecFreeExprContext call Relevant PG commit d060e921ea5aa47b6265174c32e1128cebdbc3df https://github.com/postgres/postgres/commit/d060e921ea5aa47b6265174c32e1128cebdbc3df - PG17 uses streaming IO in analyze, fix scan_analyze_next_block function Relevant PG commit 041b96802efa33d2bc9456f2ad946976b92b5ae1 https://github.com/postgres/postgres/commit/041b96802efa33d2bc9456f2ad946976b92b5ae1 - Define ObjectClass for PG17+ only since it's removed Relevant PG commit: 89e5ef7e21812916c9cf9fcf56e45f0f74034656 https://github.com/postgres/postgres/commit/89e5ef7e21812916c9cf9fcf56e45f0f74034656 - Remove ReorderBufferTupleBuf structure. Relevant PG commit: 08e6344fd6423210b339e92c069bb979ba4e7cd6 https://github.com/postgres/postgres/commit/08e6344fd6423210b339e92c069bb979ba4e7cd6 - Define colliculocale and daticulocale since they have been renamed Relevant PG commit: f696c0cd5f299f1b51e214efc55a22a782cc175d https://github.com/postgres/postgres/commit/f696c0cd5f299f1b51e214efc55a22a782cc175d - makeStringConst defined in PG17 Relevant PG commit: de3600452b61d1bc3967e9e37e86db8956c8f577 https://github.com/postgres/postgres/commit/de3600452b61d1bc3967e9e37e86db8956c8f577 - RangeVarCallbackOwnsTable was replaced by RangeVarCallbackMaintainsTable Relevant PG commit: ecb0fd33720fab91df1207e85704f382f55e1eb7 https://github.com/postgres/postgres/commit/ecb0fd33720fab91df1207e85704f382f55e1eb7 - attstattarget is nullable, define pg compatible functions for it Relevant PG commit: 4f622503d6de975ac87448aea5cea7de4bc140d5 https://github.com/postgres/postgres/commit/4f622503d6de975ac87448aea5cea7de4bc140d5 - stxstattarget is nullable in PG17, write compat functions for it Relevant PG commit: 012460ee93c304fbc7220e5b55d9d0577fc766ab https://github.com/postgres/postgres/commit/012460ee93c304fbc7220e5b55d9d0577fc766ab - Use ResourceOwner to track WaitEventSet in PG17 Relevant PG commit: 50c67c2019ab9ade8aa8768bfe604cd802fe8591 https://github.com/postgres/postgres/commit/50c67c2019ab9ade8aa8768bfe604cd802fe8591 - getIdentitySequence now uses Relation instead of relation_id Relevant PG commit: 509199587df73f06eda898ae13284292f4ae573a https://github.com/postgres/postgres/commit/509199587df73f06eda898ae13284292f4ae573a - Remove no-op tuplestore_donestoring function Relevant PG commit: 75680c3d805e2323cd437ac567f0677fdfc7b680 https://github.com/postgres/postgres/commit/75680c3d805e2323cd437ac567f0677fdfc7b680 - MergeAction can have 3 merge kinds (now enum) in PG17, write compat Relevant PG commit: 0294df2f1f842dfb0eed79007b21016f486a3c6c https://github.com/postgres/postgres/commit/0294df2f1f842dfb0eed79007b21016f486a3c6c - EXPLAIN (MEMORY) is added, make changes to ExplainOnePlan Relevant PG commit: 5de890e3610d5a12cdaea36413d967cf5c544e20 https://github.com/postgres/postgres/commit/5de890e3610d5a12cdaea36413d967cf5c544e20 - LIMIT_OPTION_DEFAULT has been removed as it's useless, use LIMIT_OPTION_COUNT Relevant PG commit: a6be0600ac3b71dda8277ab0fcbe59ee101ac1ce https://github.com/postgres/postgres/commit/a6be0600ac3b71dda8277ab0fcbe59ee101ac1ce - write compat for create_foreignscan_path bcs of more arguments in PG17 Relevant PG commit: 9e9931d2bf40e2fea447d779c2e133c2c1256ef3 https://github.com/postgres/postgres/commit/9e9931d2bf40e2fea447d779c2e133c2c1256ef3 - pgprocno and lxid have been combined into a struct in PGPROC Relevant PG commits: 28f3915b73f75bd1b50ba070f56b34241fe53fd1 https://github.com/postgres/postgres/commit/28f3915b73f75bd1b50ba070f56b34241fe53fd1 ab355e3a88de745607f6dd4c21f0119b5c68f2ad https://github.com/postgres/postgres/commit/ab355e3a88de745607f6dd4c21f0119b5c68f2ad 024c521117579a6d356050ad3d78fdc95e44eefa https://github.com/postgres/postgres/commit/024c521117579a6d356050ad3d78fdc95e44eefa - Simplify CitusNewNode (#7434) postgres refactored newNode() in PG 17, the main point for doing this is the original tricks is no longer neccessary for modern compilers[1]. This does the same for Citus. This should have no backward compatibility issues since it just replaces palloc0fast with palloc0. This is good for forward compatibility since palloc0fast no longer exists in PG 17. [1] https://www.postgresql.org/message-id/b51f1fa7-7e6a-4ecc-936d-90a8a1659e7c@iki.fi (cherry picked from commit 4b295cc) --- src/backend/columnar/columnar_customscan.c | 5 - src/backend/columnar/columnar_tableam.c | 7 +- src/backend/distributed/cdc/cdc_decoder.c | 71 ++++++++++ src/backend/distributed/commands/collation.c | 16 +-- src/backend/distributed/commands/role.c | 7 + src/backend/distributed/commands/statistics.c | 11 +- .../connection/connection_management.c | 3 +- .../distributed/connection/remote_commands.c | 2 +- .../distributed/deparser/citus_ruleutils.c | 18 ++- .../deparser/deparse_statistics_stmts.c | 5 +- .../distributed/executor/adaptive_executor.c | 2 +- .../distributed/executor/query_stats.c | 3 - .../distributed/planner/merge_planner.c | 2 +- .../distributed/planner/multi_explain.c | 83 +++++++++++ .../planner/multi_physical_planner.c | 2 +- .../shardsplit/shardsplit_decoder.c | 97 +++++++++++++ src/backend/distributed/test/fake_am.c | 7 +- src/backend/distributed/test/fake_fdw.c | 10 +- .../distributed/transaction/backend_data.c | 13 +- .../distributed/transaction/lock_graph.c | 6 +- .../distributed/utils/citus_nodefuncs.c | 3 - .../worker/worker_data_fetch_protocol.c | 3 +- src/include/distributed/citus_nodes.h | 39 ++--- src/include/pg_version_compat.h | 133 ++++++++++++++++++ 24 files changed, 472 insertions(+), 76 deletions(-) diff --git a/src/backend/columnar/columnar_customscan.c b/src/backend/columnar/columnar_customscan.c index 698734bc9..c78485c66 100644 --- a/src/backend/columnar/columnar_customscan.c +++ b/src/backend/columnar/columnar_customscan.c @@ -1924,11 +1924,6 @@ ColumnarScan_EndCustomScan(CustomScanState *node) */ TableScanDesc scanDesc = node->ss.ss_currentScanDesc; - /* - * Free the exprcontext - */ - ExecFreeExprContext(&node->ss.ps); - /* * clean out the tuple table */ diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 6d31c2779..0e6c423c2 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -1424,8 +1424,13 @@ ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode, int timeout, static bool -columnar_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, +columnar_scan_analyze_next_block(TableScanDesc scan, +#if PG_VERSION_NUM >= PG_VERSION_17 + ReadStream *stream) +#else + BlockNumber blockno, BufferAccessStrategy bstrategy) +#endif { /* * Our access method is not pages based, i.e. tuples are not confined diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c index cf9f4963b..1e71a82a1 100644 --- a/src/backend/distributed/cdc/cdc_decoder.c +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -22,6 +22,8 @@ #include "utils/rel.h" #include "utils/typcache.h" +#include "pg_version_constants.h" + PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); @@ -435,6 +437,74 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation return; } +#if PG_VERSION_NUM >= PG_VERSION_17 + + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + /* For insert action, only new tuple should always be translated*/ + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple = targetRelationNewTuple; + break; + } + + /* + * For update changes both old and new tuples need to be translated for target relation + * if the REPLICA IDENTITY is set to FULL. Otherwise, only the new tuple needs to be + * translated for target relation. + */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + /* For update action, new tuple should always be translated*/ + /* Get the new tuple from the ReorderBufferChange, and translate it to target relation. */ + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple = targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + /* For delete action, only old tuple should be translated*/ + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + break; + } + + default: + { + /* Do nothing for other action types. */ + break; + } + } +#else + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ switch (change->action) { @@ -499,4 +569,5 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation break; } } +#endif } diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 5ce3d1436..1a8c211f9 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -77,7 +77,7 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati * ICU-related field. Only the libc-related fields or the ICU-related field * is set, never both. */ - char *colliculocale; + char *colllocale; bool isnull; Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate, @@ -101,17 +101,17 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati collctype = NULL; } - datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colliculocale, &isnull); + datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colllocale, &isnull); if (!isnull) { - colliculocale = TextDatumGetCString(datum); + colllocale = TextDatumGetCString(datum); } else { - colliculocale = NULL; + colllocale = NULL; } - Assert((collcollate && collctype) || colliculocale); + Assert((collcollate && collctype) || colllocale); #else /* @@ -147,12 +147,12 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati *quotedCollationName, providerString); #if PG_VERSION_NUM >= PG_VERSION_15 - if (colliculocale) + if (colllocale) { appendStringInfo(&collationNameDef, ", locale = %s", - quote_literal_cstr(colliculocale)); - pfree(colliculocale); + quote_literal_cstr(colllocale)); + pfree(colllocale); } else { diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 05959ca3e..dd45c98d8 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -71,7 +71,9 @@ static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); static char * GetDatabaseNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); +#if PG_VERSION_NUM < PG_VERSION_17 static Node * makeStringConst(char *str, int location); +#endif static Node * makeIntConst(int val, int location); static Node * makeFloatConst(char *str, int location); static const char * WrapQueryInAlterRoleIfExistsCall(const char *query, RoleSpec *role); @@ -949,6 +951,8 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString, } +#if PG_VERSION_NUM < PG_VERSION_17 + /* * makeStringConst creates a Const Node that stores a given string * @@ -972,6 +976,9 @@ makeStringConst(char *str, int location) } +#endif + + /* * makeIntConst creates a Const Node that stores a given integer * diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 45d79afe4..b43f6335e 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -651,14 +651,15 @@ GetAlterIndexStatisticsCommands(Oid indexOid) } Form_pg_attribute targetAttr = (Form_pg_attribute) GETSTRUCT(attTuple); - if (targetAttr->attstattarget != DEFAULT_STATISTICS_TARGET) + int32 targetAttstattarget = getAttstattarget_compat(attTuple); + if (targetAttstattarget != DEFAULT_STATISTICS_TARGET) { char *indexNameWithSchema = generate_qualified_relation_name(indexOid); char *command = GenerateAlterIndexColumnSetStatsCommand(indexNameWithSchema, targetAttr->attnum, - targetAttr->attstattarget); + targetAttstattarget); alterIndexStatisticsCommandList = lappend(alterIndexStatisticsCommandList, @@ -773,9 +774,10 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid) } Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup); + int16 currentStxstattarget = getStxstattarget_compat(tup); ReleaseSysCache(tup); - if (statisticsForm->stxstattarget == -1) + if (currentStxstattarget == -1) { return NULL; } @@ -785,7 +787,8 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid) char *schemaName = get_namespace_name(statisticsForm->stxnamespace); char *statName = NameStr(statisticsForm->stxname); - alterStatsStmt->stxstattarget = statisticsForm->stxstattarget; + alterStatsStmt->stxstattarget = getAlterStatsStxstattarget_compat( + currentStxstattarget); alterStatsStmt->defnames = list_make2(makeString(schemaName), makeString(statName)); return DeparseAlterStatisticsStmt((Node *) alterStatsStmt); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index a8d8bad8a..4787d8f2f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -866,7 +866,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) *waitCount = 0; } - WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + WaitEventSet *waitEventSet = CreateWaitEventSet(WaitEventSetTracker_compat, + eventSetSize); EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet), waitEventSet); diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 7a9e0601d..c9860c061 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1130,7 +1130,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, /* allocate pending connections + 2 for the signal latch and postmaster death */ /* (CreateWaitEventSet makes room for pgwin32_signal_event automatically) */ - WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, + WaitEventSet *waitEventSet = CreateWaitEventSet(WaitEventSetTracker_compat, pendingConnectionCount + 2); for (int connectionIndex = 0; connectionIndex < pendingConnectionCount; diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 3b387799b..530f6e720 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -395,7 +395,8 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults if (attributeForm->attidentity && includeIdentityDefaults) { bool missing_ok = false; - Oid seqOid = getIdentitySequence(RelationGetRelid(relation), + Oid seqOid = getIdentitySequence(identitySequenceRelation_compat( + relation), attributeForm->attnum, missing_ok); if (includeIdentityDefaults == INCLUDE_IDENTITY) @@ -738,7 +739,18 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) * If the user changed the column's statistics target, create * alter statement and add statement to a list for later processing. */ - if (attributeForm->attstattarget >= 0) + HeapTuple atttuple = SearchSysCache2(ATTNUM, + ObjectIdGetDatum(tableRelationId), + Int16GetDatum(attributeForm->attnum)); + if (!HeapTupleIsValid(atttuple)) + { + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attributeForm->attnum, tableRelationId); + } + + int32 targetAttstattarget = getAttstattarget_compat(atttuple); + ReleaseSysCache(atttuple); + if (targetAttstattarget >= 0) { StringInfoData statement = { NULL, 0, 0, 0 }; initStringInfo(&statement); @@ -746,7 +758,7 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) appendStringInfo(&statement, "ALTER COLUMN %s ", quote_identifier(attributeName)); appendStringInfo(&statement, "SET STATISTICS %d", - attributeForm->attstattarget); + targetAttstattarget); columnOptionList = lappend(columnOptionList, statement.data); } diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index 4d7211939..79be835b9 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -177,8 +177,9 @@ AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt) static void AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt) { - appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", NameListToQuotedString( - stmt->defnames), stmt->stxstattarget); + appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", + NameListToQuotedString(stmt->defnames), + getIntStxstattarget_compat(stmt->stxstattarget)); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f7e7c2337..9606cd724 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -4740,7 +4740,7 @@ BuildWaitEventSet(List *sessionList) int eventSetSize = GetEventSetSize(sessionList); WaitEventSet *waitEventSet = - CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + CreateWaitEventSet(WaitEventSetTracker_compat, eventSetSize); WorkerSession *session = NULL; foreach_declared_ptr(session, sessionList) diff --git a/src/backend/distributed/executor/query_stats.c b/src/backend/distributed/executor/query_stats.c index f37a99bbf..ce6179b96 100644 --- a/src/backend/distributed/executor/query_stats.c +++ b/src/backend/distributed/executor/query_stats.c @@ -759,9 +759,6 @@ citus_query_stats(PG_FUNCTION_ARGS) LWLockRelease(queryStats->lock); - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - return (Datum) 0; } diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 66eaf71da..8048002e0 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -1475,7 +1475,7 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query) foreach_declared_ptr(action, query->mergeActionList) { /* Skip MATCHED clause as INSERTS are not allowed in it */ - if (action->matched) + if (matched_compat(action)) { continue; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 86a9f2a64..3f0120dae 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -375,6 +375,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) { bufusage_start = pgBufferUsage; @@ -432,8 +447,20 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + + ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL)); +#else ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, (es->buffers ? &bufusage : NULL)); +#endif ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("Subplan", NULL, true, es); @@ -1253,6 +1280,21 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) { bufusage_start = pgBufferUsage; @@ -1286,9 +1328,23 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, queryEnv, + &planduration, (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL)); +#else + /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL)); +#endif } @@ -1701,6 +1757,21 @@ ExplainOneQuery(Query *query, int cursorOptions, BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) bufusage_start = pgBufferUsage; INSTR_TIME_SET_CURRENT(planstart); @@ -1718,9 +1789,21 @@ ExplainOneQuery(Query *query, int cursorOptions, BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, queryEnv, + &planduration, (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL)); +#else /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL)); +#endif } } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 2fb5b26e3..dee3464cf 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -547,7 +547,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList) List *sortClauseList = NIL; Node *limitCount = NULL; Node *limitOffset = NULL; - LimitOption limitOption = LIMIT_OPTION_DEFAULT; + LimitOption limitOption = LIMIT_OPTION_COUNT; Node *havingQual = NULL; bool hasDistinctOn = false; List *distinctClause = NIL; diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 841fa89cd..bcd25ce2e 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -14,6 +14,8 @@ #include "utils/lsyscache.h" #include "utils/typcache.h" +#include "pg_version_constants.h" + #include "distributed/listutils.h" #include "distributed/metadata/distobject.h" #include "distributed/shardinterval_utils.h" @@ -134,6 +136,43 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } Oid targetRelationOid = InvalidOid; + +#if PG_VERSION_NUM >= PG_VERSION_17 + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newTuple = change->data.tp.newtuple; + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + /* updating non-partition column value */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple newTuple = change->data.tp.newtuple; + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldTuple = change->data.tp.oldtuple; + targetRelationOid = FindTargetRelationOid(relation, oldTuple, + replicationSlotName); + + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } +#else switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -168,6 +207,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } +#endif /* Current replication slot is not responsible for handling the change */ if (targetRelationOid == InvalidOid) @@ -185,6 +225,62 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc targetRelationDesc = RelationGetDescr(targetRelation); if (sourceRelationDesc->natts > targetRelationDesc->natts) { +#if PG_VERSION_NUM >= PG_VERSION_17 + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple = targetRelationNewTuple; + break; + } + + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple = targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } +#else switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -239,6 +335,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } +#endif } pgOutputPluginChangeCB(ctx, txn, targetRelation, change); diff --git a/src/backend/distributed/test/fake_am.c b/src/backend/distributed/test/fake_am.c index cff124961..928051942 100644 --- a/src/backend/distributed/test/fake_am.c +++ b/src/backend/distributed/test/fake_am.c @@ -372,8 +372,13 @@ fake_vacuum(Relation onerel, VacuumParams *params, static bool -fake_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, +fake_scan_analyze_next_block(TableScanDesc scan, +#if PG_VERSION_NUM >= PG_VERSION_17 + ReadStream *stream) +#else + BlockNumber blockno, BufferAccessStrategy bstrategy) +#endif { /* we don't support analyze, so return false */ return false; diff --git a/src/backend/distributed/test/fake_fdw.c b/src/backend/distributed/test/fake_fdw.c index 585e61d41..90b205b1e 100644 --- a/src/backend/distributed/test/fake_fdw.c +++ b/src/backend/distributed/test/fake_fdw.c @@ -29,7 +29,7 @@ #include "optimizer/restrictinfo.h" #include "utils/palloc.h" -#include "pg_version_constants.h" +#include "pg_version_compat.h" /* local function forward declarations */ static void FakeGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, @@ -91,9 +91,11 @@ FakeGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) Cost startup_cost = 0; Cost total_cost = startup_cost + baserel->rows; - add_path(baserel, (Path *) create_foreignscan_path(root, baserel, NULL, baserel->rows, - startup_cost, total_cost, NIL, - NULL, NULL, NIL)); + add_path(baserel, (Path *) create_foreignscan_path_compat(root, baserel, NULL, + baserel->rows, + startup_cost, total_cost, + NIL, + NULL, NULL, NIL, NIL)); } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index a0584bde8..f23d58bd0 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -33,7 +33,7 @@ #include "storage/spin.h" #include "utils/timestamp.h" -#include "pg_version_constants.h" +#include "pg_version_compat.h" #include "distributed/backend_data.h" #include "distributed/connection_management.h" @@ -700,7 +700,7 @@ InitializeBackendData(const char *applicationName) uint64 gpid = ExtractGlobalPID(applicationName); - MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno]; + MyBackendData = &backendManagementShmemData->backends[getProcNo_compat(MyProc)]; Assert(MyBackendData); @@ -1174,11 +1174,11 @@ CurrentDistributedTransactionNumber(void) void GetBackendDataForProc(PGPROC *proc, BackendData *result) { - int pgprocno = proc->pgprocno; + int pgprocno = getProcNo_compat(proc); if (proc->lockGroupLeader != NULL) { - pgprocno = proc->lockGroupLeader->pgprocno; + pgprocno = getProcNo_compat(proc->lockGroupLeader); } BackendData *backendData = &backendManagementShmemData->backends[pgprocno]; @@ -1198,7 +1198,8 @@ GetBackendDataForProc(PGPROC *proc, BackendData *result) void CancelTransactionDueToDeadlock(PGPROC *proc) { - BackendData *backendData = &backendManagementShmemData->backends[proc->pgprocno]; + BackendData *backendData = &backendManagementShmemData->backends[getProcNo_compat( + proc)]; /* backend might not have used citus yet and thus not initialized backend data */ if (!backendData) @@ -1330,7 +1331,7 @@ ActiveDistributedTransactionNumbers(void) LocalTransactionId GetMyProcLocalTransactionId(void) { - return MyProc->lxid; + return getLxid_compat(MyProc); } diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index a224c29e1..dadcfe0a4 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -23,6 +23,8 @@ #include "utils/hsearch.h" #include "utils/timestamp.h" +#include "pg_version_compat.h" + #include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" @@ -993,7 +995,7 @@ AllocWaitEdge(WaitGraph *waitGraph) static void AddProcToVisit(PROCStack *remaining, PGPROC *proc) { - if (remaining->procAdded[proc->pgprocno]) + if (remaining->procAdded[getProcNo_compat(proc)]) { return; } @@ -1001,7 +1003,7 @@ AddProcToVisit(PROCStack *remaining, PGPROC *proc) Assert(remaining->procCount < TotalProcCount()); remaining->procs[remaining->procCount++] = proc; - remaining->procAdded[proc->pgprocno] = true; + remaining->procAdded[getProcNo_compat(proc)] = true; } diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 0b03926f8..076e8ce6a 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -53,9 +53,6 @@ static const char *CitusNodeTagNamesD[] = { const char **CitusNodeTagNames = CitusNodeTagNamesD; -/* support for CitusNewNode() macro */ -CitusNode *newCitusNodeMacroHolder; - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(citus_extradata_container); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index d2b60aa50..0370001ee 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -170,7 +170,8 @@ worker_adjust_identity_column_seq_ranges(PG_FUNCTION_ARGS) if (attributeForm->attidentity) { - Oid sequenceOid = getIdentitySequence(tableRelationId, + Oid sequenceOid = getIdentitySequence(identitySequenceRelation_compat( + tableRelation), attributeForm->attnum, missingSequenceOk); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 888133a89..16df367aa 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -92,38 +92,21 @@ CitusNodeTagI(Node *node) return ((CitusNode*)(node))->citus_tag; } -/* - * Postgres's nodes/nodes.h has more information on why we do this. - */ -#ifdef __GNUC__ /* Citus variant of newNode(), don't use directly. */ -#define CitusNewNode(size, tag) \ -({ CitusNode *_result; \ - AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \ - _result = (CitusNode *) palloc0fast(size); \ - _result->extensible.type = T_ExtensibleNode; \ - _result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \ - _result->citus_tag =(int) (tag); \ - _result; \ -}) +static inline CitusNode * +CitusNewNode(size_t size, CitusNodeTag tag) +{ + CitusNode *result; -#else - -extern CitusNode *newCitusNodeMacroHolder; - -#define CitusNewNode(size, tag) \ -( \ - AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \ - newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \ - newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \ - newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \ - newCitusNodeMacroHolder->citus_tag =(int) (tag), \ - newCitusNodeMacroHolder \ -) - -#endif + Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */ + result = (CitusNode *) palloc0(size); + result->extensible.type = T_ExtensibleNode; + result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; + result->citus_tag = (int) (tag); + return result; +} /* * IsA equivalent that compares node tags, including Citus-specific nodes. diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 4e874e2ee..80c4e9d3d 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -13,6 +13,139 @@ #include "pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_17 + +#include "catalog/pg_am.h" +#include "catalog/pg_auth_members.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_class.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_database.h" +#include "catalog/pg_extension.h" +#include "catalog/pg_foreign_server.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_parameter_acl.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_publication.h" +#include "catalog/pg_tablespace.h" +#include "catalog/pg_transform.h" +#include "catalog/pg_ts_config.h" +#include "catalog/pg_ts_dict.h" +#include "catalog/pg_ts_template.h" +#include "catalog/pg_type.h" + +typedef int ObjectClass; +#define getObjectClass(a) a->classId +#define LAST_OCLASS TransformRelationId +#define OCLASS_ROLE AuthIdRelationId +#define OCLASS_DATABASE DatabaseRelationId +#define OCLASS_TBLSPACE TableSpaceRelationId +#define OCLASS_PARAMETER_ACL ParameterAclRelationId +#define OCLASS_ROLE_MEMBERSHIP AuthMemRelationId +#define OCLASS_CLASS RelationRelationId +#define OCLASS_COLLATION CollationRelationId +#define OCLASS_CONSTRAINT ConstraintRelationId +#define OCLASS_PROC ProcedureRelationId +#define OCLASS_PUBLICATION PublicationRelationId +#define OCLASS_SCHEMA NamespaceRelationId +#define OCLASS_TSCONFIG TSConfigRelationId +#define OCLASS_TSDICT TSDictionaryRelationId +#define OCLASS_TYPE TypeRelationId +#define OCLASS_EXTENSION ExtensionRelationId +#define OCLASS_FOREIGN_SERVER ForeignServerRelationId +#define OCLASS_AM AccessMethodRelationId +#define OCLASS_TSTEMPLATE TSTemplateRelationId + +#include "commands/tablecmds.h" + +static inline void +RangeVarCallbackOwnsTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + return RangeVarCallbackMaintainsTable(relation, relId, oldRelId, arg); +} + + +#include "catalog/pg_attribute.h" +#include "utils/syscache.h" + +static inline int +getAttstattarget_compat(HeapTuple attTuple) +{ + bool isnull; + Datum dat = SysCacheGetAttr(ATTNUM, attTuple, + Anum_pg_attribute_attstattarget, &isnull); + return (isnull ? -1 : DatumGetInt16(dat)); +} + + +#include "catalog/pg_statistic_ext.h" + +static inline int +getStxstattarget_compat(HeapTuple tup) +{ + bool isnull; + Datum dat = SysCacheGetAttr(STATEXTOID, tup, + Anum_pg_statistic_ext_stxstattarget, &isnull); + return (isnull ? -1 : DatumGetInt16(dat)); +} + + +#define getAlterStatsStxstattarget_compat(a) ((Node *) makeInteger(a)) +#define getIntStxstattarget_compat(a) (intVal(a)) + +#define WaitEventSetTracker_compat CurrentResourceOwner + +#define identitySequenceRelation_compat(a) (a) + +#define matched_compat(a) (a->matchKind == MERGE_WHEN_MATCHED) + +#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, \ + k) create_foreignscan_path(a, b, c, d, e, f, g, h, \ + i, j, k) + +#define getProcNo_compat(a) (a->vxid.procNumber) +#define getLxid_compat(a) (a->vxid.lxid) + +#else + +#define Anum_pg_collation_colllocale Anum_pg_collation_colliculocale + +#include "access/htup_details.h" +static inline int +getAttstattarget_compat(HeapTuple attTuple) +{ + return ((Form_pg_attribute) GETSTRUCT(attTuple))->attstattarget; +} + + +#include "catalog/pg_statistic_ext.h" +static inline int +getStxstattarget_compat(HeapTuple tup) +{ + return ((Form_pg_statistic_ext) GETSTRUCT(tup))->stxstattarget; +} + + +#define getAlterStatsStxstattarget_compat(a) (a) +#define getIntStxstattarget_compat(a) (a) + +#define WaitEventSetTracker_compat CurrentMemoryContext + +#define identitySequenceRelation_compat(a) (RelationGetRelid(a)) + +#define matched_compat(a) (a->matched) + +#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, \ + k) create_foreignscan_path(a, b, c, d, e, f, g, h, \ + i, k) + +#define getProcNo_compat(a) (a->pgprocno) +#define getLxid_compat(a) (a->lxid) + +#endif + #if PG_VERSION_NUM >= PG_VERSION_16 #include "utils/guc_tables.h"