mirror of https://github.com/citusdata/citus.git
Add connparam invalidation trigger creation logic
This needs to live in Community, since we haven't yet added the com- plication of having divergent upgrade scripts in Enterprise.pull/2229/head
parent
19cadf52ca
commit
7a75c2ed31
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '7.5-2'
|
default_version = '7.5-3'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -16,7 +16,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
7.2-1 7.2-2 7.2-3 \
|
7.2-1 7.2-2 7.2-3 \
|
||||||
7.3-1 7.3-2 7.3-3 \
|
7.3-1 7.3-2 7.3-3 \
|
||||||
7.4-1 7.4-2 7.4-3 \
|
7.4-1 7.4-2 7.4-3 \
|
||||||
7.5-1 7.5-2
|
7.5-1 7.5-2 7.5-3
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -204,6 +204,8 @@ $(EXTENSION)--7.5-1.sql: $(EXTENSION)--7.4-3.sql $(EXTENSION)--7.4-3--7.5-1.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--7.5-2.sql: $(EXTENSION)--7.5-1.sql $(EXTENSION)--7.5-1--7.5-2.sql
|
$(EXTENSION)--7.5-2.sql: $(EXTENSION)--7.5-1.sql $(EXTENSION)--7.5-1--7.5-2.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--7.5-3.sql: $(EXTENSION)--7.5-2.sql $(EXTENSION)--7.5-2--7.5-3.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
/* citus--7.5-2--7.5-3 */
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE FUNCTION master_dist_authinfo_cache_invalidate()
|
||||||
|
RETURNS trigger
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$master_dist_authinfo_cache_invalidate$$;
|
||||||
|
COMMENT ON FUNCTION master_dist_authinfo_cache_invalidate()
|
||||||
|
IS 'register authinfo cache invalidation on any modifications';
|
||||||
|
|
||||||
|
CREATE FUNCTION task_tracker_conninfo_cache_invalidate()
|
||||||
|
RETURNS trigger
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$task_tracker_conninfo_cache_invalidate$$;
|
||||||
|
COMMENT ON FUNCTION task_tracker_conninfo_cache_invalidate()
|
||||||
|
IS 'invalidate task-tracker conninfo cache';
|
||||||
|
|
||||||
|
CREATE TRIGGER dist_authinfo_cache_invalidate
|
||||||
|
AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE
|
||||||
|
ON pg_catalog.pg_dist_authinfo
|
||||||
|
FOR EACH STATEMENT EXECUTE PROCEDURE master_dist_authinfo_cache_invalidate();
|
||||||
|
|
||||||
|
CREATE TRIGGER dist_authinfo_task_tracker_cache_invalidate
|
||||||
|
AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE
|
||||||
|
ON pg_catalog.pg_dist_authinfo
|
||||||
|
FOR EACH STATEMENT EXECUTE PROCEDURE task_tracker_conninfo_cache_invalidate();
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '7.5-2'
|
default_version = '7.5-3'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -645,10 +645,12 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
||||||
|
|
||||||
/* search our cache for precomputed connection settings */
|
/* search our cache for precomputed connection settings */
|
||||||
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
||||||
if (!found)
|
if (!found || !entry->isValid)
|
||||||
{
|
{
|
||||||
/* if they're not found, compute them from GUC, runtime, etc. */
|
/* if they're not found, compute them from GUC, runtime, etc. */
|
||||||
GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext);
|
GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext);
|
||||||
|
|
||||||
|
entry->isValid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
|
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
|
||||||
|
|
|
@ -212,6 +212,7 @@ PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
|
||||||
|
PG_FUNCTION_INFO_V1(master_dist_authinfo_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(role_exists);
|
PG_FUNCTION_INFO_V1(role_exists);
|
||||||
PG_FUNCTION_INFO_V1(authinfo_valid);
|
PG_FUNCTION_INFO_V1(authinfo_valid);
|
||||||
|
|
||||||
|
@ -2418,6 +2419,31 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* master_dist_authinfo_cache_invalidate is a trigger function that performs
|
||||||
|
* relcache invalidations when the contents of pg_dist_authinfo are changed
|
||||||
|
* on the SQL level.
|
||||||
|
*
|
||||||
|
* NB: We decided there is little point in checking permissions here, there
|
||||||
|
* are much easier ways to waste CPU than causing cache invalidations.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
master_dist_authinfo_cache_invalidate(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
if (!CALLED_AS_TRIGGER(fcinfo))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
||||||
|
errmsg("must be called as trigger")));
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/* no-op in community edition */
|
||||||
|
|
||||||
|
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_dist_local_group_cache_invalidate is a trigger function that performs
|
* master_dist_local_group_cache_invalidate is a trigger function that performs
|
||||||
* relcache invalidations when the contents of pg_dist_local_group are changed
|
* relcache invalidations when the contents of pg_dist_local_group are changed
|
||||||
|
@ -2951,11 +2977,21 @@ CreateDistTableCache(void)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
|
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
|
||||||
* and invalidates the worker node and local group ID caches.
|
* and invalidates the worker node, ConnParams, and local group ID caches.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InvalidateMetadataSystemCache(void)
|
InvalidateMetadataSystemCache(void)
|
||||||
{
|
{
|
||||||
|
ConnParamsHashEntry *entry = NULL;
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
|
||||||
|
hash_seq_init(&status, ConnParamsHash);
|
||||||
|
|
||||||
|
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
entry->isValid = false;
|
||||||
|
}
|
||||||
|
|
||||||
memset(&MetadataCache, 0, sizeof(MetadataCache));
|
memset(&MetadataCache, 0, sizeof(MetadataCache));
|
||||||
workerNodeHashValid = false;
|
workerNodeHashValid = false;
|
||||||
LocalGroupId = -1;
|
LocalGroupId = -1;
|
||||||
|
|
|
@ -607,6 +607,8 @@ TaskTrackerShmemInit(void)
|
||||||
|
|
||||||
LWLockInitialize(&WorkerTasksSharedState->taskHashLock,
|
LWLockInitialize(&WorkerTasksSharedState->taskHashLock,
|
||||||
WorkerTasksSharedState->taskHashTrancheId);
|
WorkerTasksSharedState->taskHashTrancheId);
|
||||||
|
|
||||||
|
WorkerTasksSharedState->conninfosValid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* allocate hash table */
|
/* allocate hash table */
|
||||||
|
@ -869,6 +871,19 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash)
|
||||||
|
|
||||||
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
if (!WorkerTasksSharedState->conninfosValid)
|
||||||
|
{
|
||||||
|
ConnParamsHashEntry *entry = NULL;
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
|
||||||
|
hash_seq_init(&status, ConnParamsHash);
|
||||||
|
|
||||||
|
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
entry->isValid = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* schedule new tasks if we have any */
|
/* schedule new tasks if we have any */
|
||||||
if (schedulableTaskList != NIL)
|
if (schedulableTaskList != NIL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/schemacmds.h"
|
#include "commands/schemacmds.h"
|
||||||
|
#include "commands/trigger.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
@ -46,6 +47,7 @@ static void CleanupTask(WorkerTask *workerTask);
|
||||||
PG_FUNCTION_INFO_V1(task_tracker_assign_task);
|
PG_FUNCTION_INFO_V1(task_tracker_assign_task);
|
||||||
PG_FUNCTION_INFO_V1(task_tracker_task_status);
|
PG_FUNCTION_INFO_V1(task_tracker_task_status);
|
||||||
PG_FUNCTION_INFO_V1(task_tracker_cleanup_job);
|
PG_FUNCTION_INFO_V1(task_tracker_cleanup_job);
|
||||||
|
PG_FUNCTION_INFO_V1(task_tracker_conninfo_cache_invalidate);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -222,6 +224,30 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* task_tracker_conninfo_cache_invalidate is a trigger function that signals to
|
||||||
|
* the task tracker to refresh its conn params cache after a authinfo change.
|
||||||
|
*
|
||||||
|
* NB: We decided there is little point in checking permissions here, there
|
||||||
|
* are much easier ways to waste CPU than causing cache invalidations.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
task_tracker_conninfo_cache_invalidate(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
if (!CALLED_AS_TRIGGER(fcinfo))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
||||||
|
errmsg("must be called as trigger")));
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/* no-op in community edition */
|
||||||
|
|
||||||
|
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TaskTrackerRunning checks if the task tracker process is running. To do this,
|
* TaskTrackerRunning checks if the task tracker process is running. To do this,
|
||||||
* the function checks if the task tracker is configured to start up, and infers
|
* the function checks if the task tracker is configured to start up, and infers
|
||||||
|
|
|
@ -117,6 +117,7 @@ typedef struct ConnectionHashEntry
|
||||||
typedef struct ConnParamsHashEntry
|
typedef struct ConnParamsHashEntry
|
||||||
{
|
{
|
||||||
ConnectionHashKey key;
|
ConnectionHashKey key;
|
||||||
|
bool isValid;
|
||||||
char **keywords;
|
char **keywords;
|
||||||
char **values;
|
char **values;
|
||||||
} ConnParamsHashEntry;
|
} ConnParamsHashEntry;
|
||||||
|
@ -130,6 +131,7 @@ extern char *NodeConninfo;
|
||||||
|
|
||||||
/* the hash table */
|
/* the hash table */
|
||||||
extern HTAB *ConnectionHash;
|
extern HTAB *ConnectionHash;
|
||||||
|
extern HTAB *ConnParamsHash;
|
||||||
|
|
||||||
/* context for all connection and transaction related memory */
|
/* context for all connection and transaction related memory */
|
||||||
extern struct MemoryContextData *ConnectionContext;
|
extern struct MemoryContextData *ConnectionContext;
|
||||||
|
|
|
@ -109,6 +109,7 @@ typedef struct WorkerTasksSharedStateData
|
||||||
LWLockTranche taskHashLockTranche;
|
LWLockTranche taskHashLockTranche;
|
||||||
#endif
|
#endif
|
||||||
LWLock taskHashLock;
|
LWLock taskHashLock;
|
||||||
|
bool conninfosValid;
|
||||||
} WorkerTasksSharedStateData;
|
} WorkerTasksSharedStateData;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -136,6 +136,9 @@ ALTER EXTENSION citus UPDATE TO '7.3-3';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.4-1';
|
ALTER EXTENSION citus UPDATE TO '7.4-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.4-2';
|
ALTER EXTENSION citus UPDATE TO '7.4-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.4-3';
|
ALTER EXTENSION citus UPDATE TO '7.4-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.5-1';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.5-2';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.5-3';
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
|
|
|
@ -136,6 +136,9 @@ ALTER EXTENSION citus UPDATE TO '7.3-3';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.4-1';
|
ALTER EXTENSION citus UPDATE TO '7.4-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.4-2';
|
ALTER EXTENSION citus UPDATE TO '7.4-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.4-3';
|
ALTER EXTENSION citus UPDATE TO '7.4-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.5-1';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.5-2';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.5-3';
|
||||||
|
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
#define CITUS_EDITION "community"
|
#define CITUS_EDITION "community"
|
||||||
|
|
||||||
/* Extension version expected by this Citus build */
|
/* Extension version expected by this Citus build */
|
||||||
#define CITUS_EXTENSIONVERSION "7.5-2"
|
#define CITUS_EXTENSIONVERSION "7.5-3"
|
||||||
|
|
||||||
/* Citus major version as a string */
|
/* Citus major version as a string */
|
||||||
#define CITUS_MAJORVERSION "7.5"
|
#define CITUS_MAJORVERSION "7.5"
|
||||||
|
|
Loading…
Reference in New Issue