Merge pull request #6022 from citusdata/baby_step_pg_15

PG 15 compatibility: Resolve compile issues + adjust shmem requests for
pull/5981/head
Önder Kalacı 2022-07-18 17:48:50 +02:00 committed by GitHub
commit 9d5ca41e8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 105 additions and 35 deletions

View File

@ -83,7 +83,7 @@ PreprocessGrantOnFDWStmt(Node *node, const char *queryString,
static bool static bool
NameListHasFDWOwnedByDistributedExtension(List *FDWNames) NameListHasFDWOwnedByDistributedExtension(List *FDWNames)
{ {
Value *FDWValue = NULL; String *FDWValue = NULL;
foreach_ptr(FDWValue, FDWNames) foreach_ptr(FDWValue, FDWNames)
{ {
/* captures the extension address during lookup */ /* captures the extension address during lookup */

View File

@ -2038,7 +2038,7 @@ FilterDistributedFunctions(GrantStmt *grantStmt)
List *namespaceOidList = NIL; List *namespaceOidList = NIL;
/* iterate over all namespace names provided to get their oid's */ /* iterate over all namespace names provided to get their oid's */
Value *namespaceValue = NULL; String *namespaceValue = NULL;
foreach_ptr(namespaceValue, grantStmt->objects) foreach_ptr(namespaceValue, grantStmt->objects)
{ {
char *nspname = strVal(namespaceValue); char *nspname = strVal(namespaceValue);

View File

@ -612,8 +612,8 @@ RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64 sh
void void
DropPolicyEventExtendNames(DropStmt *dropStmt, const char *schemaName, uint64 shardId) DropPolicyEventExtendNames(DropStmt *dropStmt, const char *schemaName, uint64 shardId)
{ {
Value *relationSchemaNameValue = NULL; String *relationSchemaNameValue = NULL;
Value *relationNameValue = NULL; String *relationNameValue = NULL;
uint32 dropCount = list_length(dropStmt->objects); uint32 dropCount = list_length(dropStmt->objects);
if (dropCount > 1) if (dropCount > 1)
@ -652,10 +652,10 @@ DropPolicyEventExtendNames(DropStmt *dropStmt, const char *schemaName, uint64 sh
/* prefix with schema name if it is not added already */ /* prefix with schema name if it is not added already */
if (relationSchemaNameValue == NULL) if (relationSchemaNameValue == NULL)
{ {
Value *schemaNameValue = makeString(pstrdup(schemaName)); String *schemaNameValue = makeString(pstrdup(schemaName));
relationNameList = lcons(schemaNameValue, relationNameList); relationNameList = lcons(schemaNameValue, relationNameList);
} }
char **relationName = &(relationNameValue->val.str); char **relationName = &(strVal(relationNameValue));
AppendShardIdToName(relationName, shardId); AppendShardIdToName(relationName, shardId);
} }

View File

@ -825,7 +825,7 @@ FilterDistributedSequences(GrantStmt *stmt)
{ {
/* iterate over all namespace names provided to get their oid's */ /* iterate over all namespace names provided to get their oid's */
List *namespaceOidList = NIL; List *namespaceOidList = NIL;
Value *namespaceValue = NULL; String *namespaceValue = NULL;
foreach_ptr(namespaceValue, stmt->objects) foreach_ptr(namespaceValue, stmt->objects)
{ {
char *nspname = strVal(namespaceValue); char *nspname = strVal(namespaceValue);

View File

@ -103,7 +103,7 @@ GenerateConninfoWithAuth(char *conninfo)
} }
else if (strcmp(option->keyword, "port") == 0) else if (strcmp(option->keyword, "port") == 0)
{ {
port = pg_atoi(option->val, 4, 0); port = pg_strtoint32(option->val);
} }
else if (strcmp(option->keyword, "user") == 0) else if (strcmp(option->keyword, "user") == 0)
{ {

View File

@ -455,7 +455,7 @@ GetEffectiveConnKey(ConnectionHashKey *key)
} }
else if (strcmp(option->keyword, "port") == 0) else if (strcmp(option->keyword, "port") == 0)
{ {
effectiveKey->port = pg_atoi(option->val, 4, 0); effectiveKey->port = pg_strtoint32(option->val);
} }
else if (strcmp(option->keyword, "dbname") == 0) else if (strcmp(option->keyword, "dbname") == 0)
{ {

View File

@ -123,8 +123,6 @@ static void StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor); tupleDescriptor);
static void LockConnectionSharedMemory(LWLockMode lockMode); static void LockConnectionSharedMemory(LWLockMode lockMode);
static void UnLockConnectionSharedMemory(void); static void UnLockConnectionSharedMemory(void);
static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void);
static bool ShouldWaitForConnection(int currentConnectionCount); static bool ShouldWaitForConnection(int currentConnectionCount);
static uint32 SharedConnectionHashHash(const void *key, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
@ -617,11 +615,15 @@ WaitForSharedConnection(void)
void void
InitializeSharedConnectionStats(void) InitializeSharedConnectionStats(void)
{ {
/* on PG 15, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
/* allocate shared memory */ /* allocate shared memory */
if (!IsUnderPostmaster) if (!IsUnderPostmaster)
{ {
RequestAddinShmemSpace(SharedConnectionStatsShmemSize()); RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
} }
#endif
prev_shmem_startup_hook = shmem_startup_hook; prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = SharedConnectionStatsShmemInit; shmem_startup_hook = SharedConnectionStatsShmemInit;
@ -632,7 +634,7 @@ InitializeSharedConnectionStats(void)
* SharedConnectionStatsShmemSize returns the size that should be allocated * SharedConnectionStatsShmemSize returns the size that should be allocated
* on the shared memory for shared connection stats. * on the shared memory for shared connection stats.
*/ */
static size_t size_t
SharedConnectionStatsShmemSize(void) SharedConnectionStatsShmemSize(void)
{ {
Size size = 0; Size size = 0;
@ -652,7 +654,7 @@ SharedConnectionStatsShmemSize(void)
* SharedConnectionStatsShmemInit initializes the shared memory used * SharedConnectionStatsShmemInit initializes the shared memory used
* for keeping track of connection stats across backends. * for keeping track of connection stats across backends.
*/ */
static void void
SharedConnectionStatsShmemInit(void) SharedConnectionStatsShmemInit(void)
{ {
bool alreadyInitialized = false; bool alreadyInitialized = false;

View File

@ -49,7 +49,6 @@
#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */ #define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */
#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */ #define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */
#define USAGE_INIT (1.0) /* including initial planning */ #define USAGE_INIT (1.0) /* including initial planning */
#define STATS_SHARED_MEM_NAME "citus_query_stats"
#define MAX_KEY_LENGTH NAMEDATALEN #define MAX_KEY_LENGTH NAMEDATALEN
@ -124,7 +123,6 @@ PG_FUNCTION_INFO_V1(citus_executor_name);
static char * CitusExecutorName(MultiExecutorType executorType); static char * CitusExecutorName(MultiExecutorType executorType);
static Size CitusQueryStatsSharedMemSize(void);
static void CitusQueryStatsShmemStartup(void); static void CitusQueryStatsShmemStartup(void);
static void CitusQueryStatsShmemShutdown(int code, Datum arg); static void CitusQueryStatsShmemShutdown(int code, Datum arg);
@ -143,10 +141,18 @@ static void CitusQueryStatsRemoveExpiredEntries(HTAB *existingQueryIdHash);
void void
InitializeCitusQueryStats(void) InitializeCitusQueryStats(void)
{ {
/* on PG 15, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
/* allocate shared memory */
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
elog(LOG, "requesting named LWLockTranch for %s", STATS_SHARED_MEM_NAME); elog(LOG, "requesting named LWLockTranch for %s", STATS_SHARED_MEM_NAME);
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
}
#endif
/* Install hook */ /* Install hook */
prev_shmem_startup_hook = shmem_startup_hook; prev_shmem_startup_hook = shmem_startup_hook;
@ -373,7 +379,7 @@ error:
* CitusQueryStatsSharedMemSize calculates and returns shared memory size * CitusQueryStatsSharedMemSize calculates and returns shared memory size
* required to keep query statistics. * required to keep query statistics.
*/ */
static Size Size
CitusQueryStatsSharedMemSize(void) CitusQueryStatsSharedMemSize(void)
{ {
Assert(StatStatementsMax >= 0); Assert(StatStatementsMax >= 0);
@ -947,7 +953,7 @@ GetPGStatStatementsMax(void)
*/ */
if (pgssMax) if (pgssMax)
{ {
maxValue = pg_atoi(pgssMax, 4, 0); maxValue = pg_strtoint32(pgssMax);
} }
return maxValue; return maxValue;

View File

@ -33,6 +33,7 @@
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
@ -1808,7 +1809,7 @@ TotalRelationSizeForSubscription(MultiConnection *connection, char *command)
{ {
char *resultString = PQgetvalue(result, 0, 0); char *resultString = PQgetvalue(result, 0, 0);
remoteTotalSize = pg_strtouint64(resultString, NULL, 10); remoteTotalSize = SafeStringToUint64(resultString);
} }
else else
{ {

View File

@ -141,6 +141,7 @@ DEFINE_COLUMNAR_PASSTHROUGH_FUNC(test_columnar_storage_write_new_page)
#define DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE 9999999 #define DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE 9999999
static char *CitusVersion = CITUS_VERSION; static char *CitusVersion = CITUS_VERSION;
static char *DeprecatedEmptyString = "";
/* deprecated GUC value that should not be used anywhere outside this file */ /* deprecated GUC value that should not be used anywhere outside this file */
static int ReplicationModel = REPLICATION_MODEL_STREAMING; static int ReplicationModel = REPLICATION_MODEL_STREAMING;
@ -150,8 +151,15 @@ static GucStringAssignHook OldApplicationNameAssignHook = NULL;
static object_access_hook_type PrevObjectAccessHook = NULL; static object_access_hook_type PrevObjectAccessHook = NULL;
#if PG_VERSION_NUM >= PG_VERSION_15
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
void _PG_init(void); void _PG_init(void);
#if PG_VERSION_NUM >= PG_VERSION_15
static void citus_shmem_request(void);
#endif
static void CitusObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int static void CitusObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int
subId, void *arg); subId, void *arg);
static void DoInitialCleanup(void); static void DoInitialCleanup(void);
@ -368,6 +376,11 @@ _PG_init(void)
original_client_auth_hook = ClientAuthentication_hook; original_client_auth_hook = ClientAuthentication_hook;
ClientAuthentication_hook = CitusAuthHook; ClientAuthentication_hook = CitusAuthHook;
#if PG_VERSION_NUM >= PG_VERSION_15
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = citus_shmem_request;
#endif
InitializeMaintenanceDaemon(); InitializeMaintenanceDaemon();
/* initialize coordinated transaction management */ /* initialize coordinated transaction management */
@ -400,6 +413,7 @@ _PG_init(void)
PrevObjectAccessHook = object_access_hook; PrevObjectAccessHook = object_access_hook;
object_access_hook = CitusObjectAccessHook; object_access_hook = CitusObjectAccessHook;
/* ensure columnar module is loaded at the right time */ /* ensure columnar module is loaded at the right time */
load_file(COLUMNAR_MODULE_NAME, false); load_file(COLUMNAR_MODULE_NAME, false);
@ -442,6 +456,30 @@ _PG_init(void)
} }
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* Requests any additional shared memory required for citus.
*/
static void
citus_shmem_request(void)
{
if (prev_shmem_request_hook)
{
prev_shmem_request_hook();
}
RequestAddinShmemSpace(BackendManagementShmemSize());
RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
}
#endif
/* /*
* DoInitialCleanup does cleanup at start time. * DoInitialCleanup does cleanup at start time.
* Currently it: * Currently it:
@ -1234,6 +1272,26 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
/*
* This was a GUC we added on Citus 11.0.1, and
* replaced with another name on 11.0.2 via #5920.
* However, as this GUC has been used in
* citus_shard_indexes_on_worker-11.0.1
* script. So, it is not easy to completely get rid
* of the GUC. Especially with PG 15+, Postgres verifies
* existence of the GUCs that are used. So, without this
* CREATE EXTENSION fails.
*/
DefineCustomStringVariable(
"citus.hide_shards_from_app_name_prefixes",
gettext_noop("Deprecated, use citus.show_shards_for_app_name_prefixes"),
NULL,
&DeprecatedEmptyString,
"",
PGC_SUSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.isolation_test_session_process_id", "citus.isolation_test_session_process_id",
NULL, NULL,

View File

@ -104,8 +104,6 @@ static BackendData *MyBackendData = NULL;
static CitusBackendType CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED; static CitusBackendType CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
static void BackendManagementShmemInit(void);
static size_t BackendManagementShmemSize(void);
static void DetermineCitusBackendType(void); static void DetermineCitusBackendType(void);
@ -515,12 +513,15 @@ UserHasPermissionToViewStatsOf(Oid currentUserId, Oid backendOwnedId)
void void
InitializeBackendManagement(void) InitializeBackendManagement(void)
{ {
/* on PG 15, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
/* allocate shared memory */ /* allocate shared memory */
if (!IsUnderPostmaster) if (!IsUnderPostmaster)
{ {
RequestAddinShmemSpace(BackendManagementShmemSize()); RequestAddinShmemSpace(BackendManagementShmemSize());
} }
#endif
prev_shmem_startup_hook = shmem_startup_hook; prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = BackendManagementShmemInit; shmem_startup_hook = BackendManagementShmemInit;
} }
@ -531,7 +532,7 @@ InitializeBackendManagement(void)
* memory startup hook. The function sets up the necessary shared memory * memory startup hook. The function sets up the necessary shared memory
* segment for the backend manager. * segment for the backend manager.
*/ */
static void void
BackendManagementShmemInit(void) BackendManagementShmemInit(void)
{ {
bool alreadyInitialized = false; bool alreadyInitialized = false;
@ -599,7 +600,7 @@ BackendManagementShmemInit(void)
* BackendManagementShmemSize returns the size that should be allocated * BackendManagementShmemSize returns the size that should be allocated
* on the shared memory for backend management. * on the shared memory for backend management.
*/ */
static size_t size_t
BackendManagementShmemSize(void) BackendManagementShmemSize(void)
{ {
Size size = 0; Size size = 0;

View File

@ -117,8 +117,6 @@ static bool IsMaintenanceDaemon = false;
static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS);
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
static size_t MaintenanceDaemonShmemSize(void);
static void MaintenanceDaemonShmemInit(void);
static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg); static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
@ -133,11 +131,6 @@ static void WarnMaintenanceDaemonNotStarted(void);
void void
InitializeMaintenanceDaemon(void) InitializeMaintenanceDaemon(void)
{ {
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
}
prev_shmem_startup_hook = shmem_startup_hook; prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = MaintenanceDaemonShmemInit; shmem_startup_hook = MaintenanceDaemonShmemInit;
} }
@ -743,7 +736,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
/* /*
* MaintenanceDaemonShmemSize computes how much shared memory is required. * MaintenanceDaemonShmemSize computes how much shared memory is required.
*/ */
static size_t size_t
MaintenanceDaemonShmemSize(void) MaintenanceDaemonShmemSize(void)
{ {
Size size = 0; Size size = 0;
@ -767,7 +760,7 @@ MaintenanceDaemonShmemSize(void)
* MaintenanceDaemonShmemInit initializes the requested shared memory for the * MaintenanceDaemonShmemInit initializes the requested shared memory for the
* maintenance daemon. * maintenance daemon.
*/ */
static void void
MaintenanceDaemonShmemInit(void) MaintenanceDaemonShmemInit(void)
{ {
bool alreadyInitialized = false; bool alreadyInitialized = false;

View File

@ -45,6 +45,8 @@ typedef struct BackendData
} BackendData; } BackendData;
extern void BackendManagementShmemInit(void);
extern size_t BackendManagementShmemSize(void);
extern void InitializeBackendManagement(void); extern void InitializeBackendManagement(void);
extern int TotalProcCount(void); extern int TotalProcCount(void);
extern void InitializeBackendData(void); extern void InitializeBackendData(void);

View File

@ -24,6 +24,8 @@ extern double DistributedDeadlockDetectionTimeoutFactor;
extern void StopMaintenanceDaemon(Oid databaseId); extern void StopMaintenanceDaemon(Oid databaseId);
extern void TriggerNodeMetadataSync(Oid databaseId); extern void TriggerNodeMetadataSync(Oid databaseId);
extern void InitializeMaintenanceDaemon(void); extern void InitializeMaintenanceDaemon(void);
extern size_t MaintenanceDaemonShmemSize(void);
extern void MaintenanceDaemonShmemInit(void);
extern void InitializeMaintenanceDaemonBackend(void); extern void InitializeMaintenanceDaemonBackend(void);
extern bool LockCitusExtension(void); extern bool LockCitusExtension(void);

View File

@ -12,6 +12,9 @@
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#define STATS_SHARED_MEM_NAME "citus_query_stats"
extern Size CitusQueryStatsSharedMemSize(void);
extern void InitializeCitusQueryStats(void); extern void InitializeCitusQueryStats(void);
extern void CitusQueryStatsExecutorsEntry(uint64 queryId, MultiExecutorType executorType, extern void CitusQueryStatsExecutorsEntry(uint64 queryId, MultiExecutorType executorType,
char *partitionKey); char *partitionKey);

View File

@ -25,6 +25,8 @@ extern int MaxClientConnections;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void); extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void);
extern size_t SharedConnectionStatsShmemSize(void);
extern void SharedConnectionStatsShmemInit(void);
extern int GetMaxClientConnections(void); extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);