Distribute CALL on distributed procedures to metadata workers

Lots taken from https://github.com/citusdata/citus/pull/2829
pull/3026/head
Philip Dubé 2019-09-11 23:08:32 +00:00 committed by Philip Dubé
parent 932a407f07
commit bc1ad67eb5
9 changed files with 521 additions and 51 deletions

View File

@ -0,0 +1,181 @@
/*-------------------------------------------------------------------------
*
* call.c
* Commands for call remote stored procedures.
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#if PG_VERSION_NUM >= 110000
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
#include "miscadmin.h"
#include "tcop/dest.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static bool CallFuncExprRemotely(CallStmt *callStmt,
DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, const char *queryString,
DestReceiver *dest);
/*
* CallDistributedProcedure calls a stored procedure on the worker if possible.
*/
bool
CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString,
DestReceiver *dest)
{
DistObjectCacheEntry *procedure = NULL;
FuncExpr *funcExpr = callStmt->funcexpr;
Oid functionId = funcExpr->funcid;
procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0);
if (procedure == NULL)
{
return false;
}
return CallFuncExprRemotely(callStmt, procedure, funcExpr, queryString, dest);
}
/*
* CallFuncExprRemotely calls a procedure of function on the worker if possible.
*/
static bool
CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, const char *queryString, DestReceiver *dest)
{
Oid colocatedRelationId = InvalidOid;
Const *partitionValue = NULL;
ShardInterval *shardInterval = NULL;
List *placementList = NIL;
ListCell *argCell = NULL;
WorkerNode *preferredWorkerNode = NULL;
DistTableCacheEntry *distTable = NULL;
ShardPlacement *placement = NULL;
WorkerNode *workerNode = NULL;
if (IsMultiStatementTransaction())
{
ereport(DEBUG2, (errmsg("cannot push down CALL in multi-statement transaction")));
return false;
}
colocatedRelationId = ColocatedTableId(procedure->colocationId);
if (colocatedRelationId == InvalidOid)
{
ereport(DEBUG2, (errmsg("stored procedure does not have co-located tables")));
return false;
}
if (procedure->distributionArgIndex < 0 ||
procedure->distributionArgIndex >= list_length(funcExpr->args))
{
ereport(DEBUG2, (errmsg("cannot push down invalid distribution_argument_index")));
return false;
}
foreach(argCell, funcExpr->args)
{
Node *argNode = (Node *) lfirst(argCell);
if (!IsA(argNode, Const))
{
ereport(DEBUG2, (errmsg("cannot push down non-constant argument value")));
return false;
}
}
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
distTable = DistributedTableCacheEntry(colocatedRelationId);
shardInterval = FindShardInterval(partitionValue->constvalue, distTable);
if (shardInterval == NULL)
{
ereport(DEBUG2, (errmsg("cannot push down call, failed to find shard interval")));
return false;
}
placementList = FinalizedShardPlacementList(shardInterval->shardId);
if (list_length(placementList) != 1)
{
/* punt on reference tables for now */
ereport(DEBUG2, (errmsg(
"cannot push down CALL for reference tables or replicated distributed tables")));
return false;
}
placement = (ShardPlacement *) linitial(placementList);
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
if (workerNode->hasMetadata)
{
/* we can execute this procedure on the worker! */
preferredWorkerNode = workerNode;
}
if (preferredWorkerNode == NULL)
{
ereport(DEBUG2, (errmsg("there is no worker node with metadata")));
return false;
}
{
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDesc,
&TTSOpsMinimalTuple);
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = 0;
task->taskType = DDL_TASK;
task->queryString = pstrdup(queryString);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NIL;
task->anchorShardId = placement->shardId;
task->relationShardList = NIL;
task->taskPlacementList = placementList;
ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, list_make1(task),
tupleDesc, tupleStore, true,
MaxAdaptiveExecutorPoolSize);
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
{
if (!dest->receiveSlot(slot, dest))
{
break;
}
}
/* Don't call tuplestore_end(tupleStore). It'll be freed soon enough in a top level CALL,
* & dest->receiveSlot could conceivably rely on slots being long lived.
*/
}
return true;
}
#endif /* PG_VERSION_NUM >= 110000 */

View File

@ -201,6 +201,19 @@ multi_ProcessUtility(PlannedStmt *pstmt,
#if (PG_VERSION_NUM >= 110000) #if (PG_VERSION_NUM >= 110000)
if (IsA(parsetree, CallStmt)) if (IsA(parsetree, CallStmt))
{ {
CallStmt *callStmt = (CallStmt *) parsetree;
/*
* If the procedure is distributed and we are using MX then we have the
* possibility of calling it on the worker. If the data is located on
* the worker this can avoid making many network round trips.
*/
if (context == PROCESS_UTILITY_TOPLEVEL &&
CallDistributedProcedureRemotely(callStmt, queryString, dest))
{
return;
}
/* /*
* Stored procedures are a bit strange in the sense that some statements * Stored procedures are a bit strange in the sense that some statements
* are not in a transaction block, but can be rolled back. We need to * are not in a transaction block, but can be rolled back. We need to

View File

@ -244,7 +244,7 @@ IsObjectDistributed(const ObjectAddress *address)
ScanKeyInit(&key[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ, ScanKeyInit(&key[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(address->objectId)); ObjectIdGetDatum(address->objectId));
ScanKeyInit(&key[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber, F_INT4EQ, ScanKeyInit(&key[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber, F_INT4EQ,
ObjectIdGetDatum(address->objectSubId)); Int32GetDatum(address->objectSubId));
pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(), pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(),
true, NULL, 3, key); true, NULL, 3, key);

View File

@ -32,6 +32,7 @@ COMMENT ON FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, obj
IS 'remove an object address from citus.pg_dist_object once the object has been deleted'; IS 'remove an object address from citus.pg_dist_object once the object has been deleted';
CREATE TABLE citus.pg_dist_object ( CREATE TABLE citus.pg_dist_object (
-- primary key
classid oid NOT NULL, classid oid NOT NULL,
objid oid NOT NULL, objid oid NOT NULL,
objsubid integer NOT NULL, objsubid integer NOT NULL,
@ -49,6 +50,17 @@ CREATE TABLE citus.pg_dist_object (
CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid) CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid)
); );
CREATE FUNCTION master_dist_object_cache_invalidate()
RETURNS trigger
LANGUAGE C
AS 'MODULE_PATHNAME', $$master_dist_object_cache_invalidate$$;
COMMENT ON FUNCTION master_dist_object_cache_invalidate()
IS 'register relcache invalidation for changed rows';
CREATE TRIGGER dist_object_cache_invalidate
AFTER INSERT OR UPDATE OR DELETE
ON citus.pg_dist_object
FOR EACH ROW EXECUTE PROCEDURE master_dist_object_cache_invalidate();
#include "udfs/create_distributed_function/9.0-1.sql" #include "udfs/create_distributed_function/9.0-1.sql"
#include "udfs/citus_drop_trigger/9.0-1.sql" #include "udfs/citus_drop_trigger/9.0-1.sql"

View File

@ -73,6 +73,7 @@ BEGIN
FROM pg_catalog.pg_dist_partition p; FROM pg_catalog.pg_dist_partition p;
-- restore pg_dist_object from the stable identifiers -- restore pg_dist_object from the stable identifiers
-- DELETE/INSERT to avoid primary key violations
WITH old_records AS ( WITH old_records AS (
DELETE FROM DELETE FROM
citus.pg_dist_object citus.pg_dist_object

View File

@ -73,6 +73,7 @@ BEGIN
FROM pg_catalog.pg_dist_partition p; FROM pg_catalog.pg_dist_partition p;
-- restore pg_dist_object from the stable identifiers -- restore pg_dist_object from the stable identifiers
-- DELETE/INSERT to avoid primary key violations
WITH old_records AS ( WITH old_records AS (
DELETE FROM DELETE FROM
citus.pg_dist_object citus.pg_dist_object

View File

@ -34,6 +34,7 @@
#include "distributed/function_utils.h" #include "distributed/function_utils.h"
#include "distributed/foreign_key_relationship.h" #include "distributed/foreign_key_relationship.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_local_group.h"
@ -55,7 +56,6 @@
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h" #include "utils/datum.h"
#include "utils/elog.h" #include "utils/elog.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -163,6 +163,9 @@ static HTAB *DistTableCacheHash = NULL;
static HTAB *DistShardCacheHash = NULL; static HTAB *DistShardCacheHash = NULL;
static MemoryContext MetadataCacheMemoryContext = NULL; static MemoryContext MetadataCacheMemoryContext = NULL;
/* Hash table for information about each object */
static HTAB *DistObjectCacheHash = NULL;
/* Hash table for informations about worker nodes */ /* Hash table for informations about worker nodes */
static HTAB *WorkerNodeHash = NULL; static HTAB *WorkerNodeHash = NULL;
static WorkerNode **WorkerNodeArray = NULL; static WorkerNode **WorkerNodeArray = NULL;
@ -172,9 +175,10 @@ static bool workerNodeHashValid = false;
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
static int32 LocalGroupId = -1; static int32 LocalGroupId = -1;
/* built first time through in InitializeDistTableCache */ /* built first time through in InitializeDistCache */
static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistPartitionScanKey[1];
static ScanKeyData DistShardScanKey[1]; static ScanKeyData DistShardScanKey[1];
static ScanKeyData DistObjectScanKey[3];
/* local function forward declarations */ /* local function forward declarations */
@ -197,7 +201,8 @@ static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength, int shardIntervalArrayLength,
FmgrInfo *shardIntervalSortCompareFunction); FmgrInfo *shardIntervalSortCompareFunction);
static void InitializeCaches(void); static void InitializeCaches(void);
static void InitializeDistTableCache(void); static void InitializeDistCache(void);
static void InitializeDistObjectCache(void);
static void InitializeWorkerNodeCache(void); static void InitializeWorkerNodeCache(void);
static void RegisterForeignKeyGraphCacheCallbacks(void); static void RegisterForeignKeyGraphCacheCallbacks(void);
static void RegisterWorkerNodeCacheCallbacks(void); static void RegisterWorkerNodeCacheCallbacks(void);
@ -205,6 +210,7 @@ static void RegisterLocalGroupIdCacheCallbacks(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize); static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void CreateDistTableCache(void); static void CreateDistTableCache(void);
static void CreateDistObjectCache(void);
static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId); static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
@ -224,7 +230,8 @@ static void CachedRelationNamespaceLookup(const char *relationName, Oid relnames
static ShardPlacement * ResolveGroupShardPlacement( static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry);
static Oid LookupEnumValueId(Oid typeId, char *valueName); static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateEntireDistCache(void); static void InvalidateDistTableCache(void);
static void InvalidateDistObjectCache(void);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -234,6 +241,7 @@ PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_authinfo_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_authinfo_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_object_cache_invalidate);
PG_FUNCTION_INFO_V1(role_exists); PG_FUNCTION_INFO_V1(role_exists);
PG_FUNCTION_INFO_V1(authinfo_valid); PG_FUNCTION_INFO_V1(authinfo_valid);
PG_FUNCTION_INFO_V1(poolinfo_valid); PG_FUNCTION_INFO_V1(poolinfo_valid);
@ -789,9 +797,8 @@ LookupShardCacheEntry(int64 shardId)
DistTableCacheEntry * DistTableCacheEntry *
DistributedTableCacheEntry(Oid distributedRelationId) DistributedTableCacheEntry(Oid distributedRelationId)
{ {
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry =
LookupDistTableCacheEntry(distributedRelationId);
cacheEntry = LookupDistTableCacheEntry(distributedRelationId);
if (cacheEntry && cacheEntry->isDistributedTable) if (cacheEntry && cacheEntry->isDistributedTable)
{ {
@ -805,6 +812,26 @@ DistributedTableCacheEntry(Oid distributedRelationId)
} }
/*
* DistributedProcedureRecord loads a record from citus.pg_dist_object.
*/
DistObjectCacheEntry *
DistributedObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
{
DistObjectCacheEntry *cacheEntry =
LookupDistObjectCacheEntry(classid, objid, objsubid);
if (cacheEntry)
{
return cacheEntry;
}
else
{
ereport(ERROR, (errmsg("object is not distributed")));
}
}
/* /*
* LookupDistTableCacheEntry returns the distributed table metadata for the * LookupDistTableCacheEntry returns the distributed table metadata for the
* passed relationId. For efficiency it caches lookups. * passed relationId. For efficiency it caches lookups.
@ -899,6 +926,107 @@ LookupDistTableCacheEntry(Oid relationId)
} }
/*
* LookupDistObjectCacheEntry returns the distributed table metadata for the
* passed relationId. For efficiency it caches lookups.
*/
DistObjectCacheEntry *
LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
{
DistObjectCacheEntry *cacheEntry = NULL;
bool foundInCache = false;
DistObjectCacheEntryKey hashKey;
Relation pgDistObjectRel = NULL;
TupleDesc pgDistObjectTupleDesc = NULL;
ScanKeyData pgDistObjectKey[3];
SysScanDesc pgDistObjectScan = NULL;
HeapTuple pgDistObjectTup = NULL;
memset(&hashKey, 0, sizeof(DistObjectCacheEntryKey));
hashKey.classid = classid;
hashKey.objid = objid;
hashKey.objsubid = objsubid;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
InitializeCaches();
cacheEntry = hash_search(DistObjectCacheHash, &hashKey, HASH_ENTER, &foundInCache);
/* return valid matches */
if (foundInCache)
{
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (cacheEntry->isValid)
{
return cacheEntry;
}
/* this is where we'd free old entry's out of band data if it had any */
}
/* zero out entry, but not the key part */
memset(((char *) cacheEntry), 0, sizeof(DistObjectCacheEntry));
cacheEntry->key.classid = classid;
cacheEntry->key.objid = objid;
cacheEntry->key.objsubid = objsubid;
pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock);
pgDistObjectTupleDesc = RelationGetDescr(pgDistObjectRel);
ScanKeyInit(&pgDistObjectKey[0], Anum_pg_dist_object_classid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classid));
ScanKeyInit(&pgDistObjectKey[1], Anum_pg_dist_object_objid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(objid));
ScanKeyInit(&pgDistObjectKey[2], Anum_pg_dist_object_objsubid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(objsubid));
pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(),
true, NULL, 3, pgDistObjectKey);
pgDistObjectTup = systable_getnext(pgDistObjectScan);
if (HeapTupleIsValid(pgDistObjectTup))
{
Datum datumArray[Natts_pg_dist_object];
bool isNullArray[Natts_pg_dist_object];
heap_deform_tuple(pgDistObjectTup, pgDistObjectTupleDesc, datumArray,
isNullArray);
cacheEntry->isValid = true;
cacheEntry->distributionArgIndex =
DatumGetInt32(datumArray[Anum_pg_dist_object_distribution_argument_index -
1]);
cacheEntry->colocationId =
DatumGetInt32(datumArray[Anum_pg_dist_object_colocationid - 1]);
}
else
{
/* return NULL, cacheEntry left invalid in hash table */
cacheEntry = NULL;
}
systable_endscan(pgDistObjectScan);
relation_close(pgDistObjectRel, AccessShareLock);
return cacheEntry;
}
/* /*
* BuildDistTableCacheEntry is a helper routine for * BuildDistTableCacheEntry is a helper routine for
* LookupDistTableCacheEntry() for building the cache contents. * LookupDistTableCacheEntry() for building the cache contents.
@ -2643,6 +2771,31 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
} }
/*
* master_dist_object_invalidate is a trigger function that performs relcache
* invalidation when the contents of pg_dist_object are changed on the SQL
* level.
*
* NB: We decided there is little point in checking permissions here, there
* are much easier ways to waste CPU than causing cache invalidations.
*/
Datum
master_dist_object_cache_invalidate(PG_FUNCTION_ARGS)
{
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistObjectRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
}
/* /*
* InitializeCaches() registers invalidation handlers for metadata_cache.c's * InitializeCaches() registers invalidation handlers for metadata_cache.c's
* caches. * caches.
@ -2670,18 +2823,12 @@ InitializeCaches(void)
/* set first, to avoid recursion dangers */ /* set first, to avoid recursion dangers */
performedInitialization = true; performedInitialization = true;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
MetadataCacheMemoryContext = AllocSetContextCreate( MetadataCacheMemoryContext = AllocSetContextCreate(
CacheMemoryContext, CacheMemoryContext,
"MetadataCacheMemoryContext", "MetadataCacheMemoryContext",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
InitializeDistTableCache(); InitializeDistCache();
RegisterForeignKeyGraphCacheCallbacks(); RegisterForeignKeyGraphCacheCallbacks();
RegisterWorkerNodeCacheCallbacks(); RegisterWorkerNodeCacheCallbacks();
RegisterLocalGroupIdCacheCallbacks(); RegisterLocalGroupIdCacheCallbacks();
@ -2708,7 +2855,7 @@ InitializeCaches(void)
/* initialize the infrastructure for the metadata cache */ /* initialize the infrastructure for the metadata cache */
static void static void
InitializeDistTableCache(void) InitializeDistCache(void)
{ {
HASHCTL info; HASHCTL info;
@ -2733,9 +2880,10 @@ InitializeDistTableCache(void)
DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_collation = InvalidOid;
DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid;
/* initialize the per-table hash table */
CreateDistTableCache(); CreateDistTableCache();
InitializeDistObjectCache();
/* initialize the per-shard hash table */ /* initialize the per-shard hash table */
MemSet(&info, 0, sizeof(info)); MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(int64); info.keysize = sizeof(int64);
@ -2752,6 +2900,40 @@ InitializeDistTableCache(void)
} }
static void
InitializeDistObjectCache(void)
{
/* build initial scan keys, copied for every relation scan */
memset(&DistObjectScanKey, 0, sizeof(DistObjectScanKey));
fmgr_info_cxt(F_OIDEQ,
&DistObjectScanKey[0].sk_func,
MetadataCacheMemoryContext);
DistObjectScanKey[0].sk_strategy = BTEqualStrategyNumber;
DistObjectScanKey[0].sk_subtype = InvalidOid;
DistObjectScanKey[0].sk_collation = InvalidOid;
DistObjectScanKey[0].sk_attno = Anum_pg_dist_object_classid;
fmgr_info_cxt(F_OIDEQ,
&DistObjectScanKey[1].sk_func,
MetadataCacheMemoryContext);
DistObjectScanKey[1].sk_strategy = BTEqualStrategyNumber;
DistObjectScanKey[1].sk_subtype = InvalidOid;
DistObjectScanKey[1].sk_collation = InvalidOid;
DistObjectScanKey[1].sk_attno = Anum_pg_dist_object_objid;
fmgr_info_cxt(F_INT4EQ,
&DistObjectScanKey[2].sk_func,
MetadataCacheMemoryContext);
DistObjectScanKey[2].sk_strategy = BTEqualStrategyNumber;
DistObjectScanKey[2].sk_subtype = InvalidOid;
DistObjectScanKey[2].sk_collation = InvalidOid;
DistObjectScanKey[2].sk_attno = Anum_pg_dist_object_objsubid;
CreateDistObjectCache();
}
/* /*
* GetWorkerNodeHash returns the worker node data as a hash with the nodename and * GetWorkerNodeHash returns the worker node data as a hash with the nodename and
* nodeport as a key. * nodeport as a key.
@ -3141,7 +3323,7 @@ InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId)
if (relationId == MetadataCache.distColocationRelationId) if (relationId == MetadataCache.distColocationRelationId)
{ {
SetForeignConstraintRelationshipGraphInvalid(); SetForeignConstraintRelationshipGraphInvalid();
InvalidateEntireDistCache(); InvalidateDistTableCache();
} }
} }
@ -3180,7 +3362,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
/* invalidate either entire cache or a specific entry */ /* invalidate either entire cache or a specific entry */
if (relationId == InvalidOid) if (relationId == InvalidOid)
{ {
InvalidateEntireDistCache(); InvalidateDistTableCache();
InvalidateDistObjectCache();
} }
else else
{ {
@ -3194,25 +3377,30 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
{ {
cacheEntry->isValid = false; cacheEntry->isValid = false;
} }
}
/* /*
* If pg_dist_partition is being invalidated drop all state * If pg_dist_partition is being invalidated drop all state
* This happens pretty rarely, but most importantly happens during * This happens pretty rarely, but most importantly happens during
* DROP EXTENSION citus; * DROP EXTENSION citus;
*/ */
if (relationId != InvalidOid && relationId == MetadataCache.distPartitionRelationId) if (relationId == MetadataCache.distPartitionRelationId)
{ {
InvalidateMetadataSystemCache(); InvalidateMetadataSystemCache();
}
if (relationId == MetadataCache.distObjectRelationId)
{
InvalidateDistObjectCache();
}
} }
} }
/* /*
* InvalidateEntireDistCache makes entire cache entries invalid. * InvalidateDistTableCache marks all DistTableCacheHash entries invalid.
*/ */
static void static void
InvalidateEntireDistCache(void) InvalidateDistTableCache(void)
{ {
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry = NULL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
@ -3226,6 +3414,24 @@ InvalidateEntireDistCache(void)
} }
/*
* InvalidateDistObjectCache marks all DistObjectCacheHash entries invalid.
*/
static void
InvalidateDistObjectCache(void)
{
DistObjectCacheEntry *cacheEntry = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, DistObjectCacheHash);
while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL)
{
cacheEntry->isValid = false;
}
}
/* /*
* FlushDistTableCache flushes the entire distributed relation cache, frees * FlushDistTableCache flushes the entire distributed relation cache, frees
* all entries, and recreates the cache. * all entries, and recreates the cache.
@ -3264,6 +3470,22 @@ CreateDistTableCache(void)
} }
/* CreateDistObjectCache initializes the per-object hash table */
static void
CreateDistObjectCache(void)
{
HASHCTL info;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(DistObjectCacheEntryKey);
info.entrysize = sizeof(DistObjectCacheEntry);
info.hash = tag_hash;
info.hcxt = MetadataCacheMemoryContext;
DistObjectCacheHash =
hash_create("Distributed Object Cache", 32, &info,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
}
/* /*
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag, * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
* and invalidates the worker node, ConnParams, and local group ID caches. * and invalidates the worker node, ConnParams, and local group ID caches.
@ -3309,9 +3531,7 @@ DistTableOidList(void)
Datum relationIdDatum = heap_getattr(heapTuple, Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid, Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
relationId = DatumGetObjectId(relationIdDatum); relationId = DatumGetObjectId(relationIdDatum);
distTableOidList = lappend_oid(distTableOidList, relationId); distTableOidList = lappend_oid(distTableOidList, relationId);
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
@ -3410,7 +3630,8 @@ LookupDistShardTuples(Oid relationId)
/* set scan arguments */ /* set scan arguments */
scanKey[0].sk_argument = ObjectIdGetDatum(relationId); scanKey[0].sk_argument = ObjectIdGetDatum(relationId);
scanDescriptor = systable_beginscan(pgDistShard, DistShardLogicalRelidIndexId(), true, scanDescriptor = systable_beginscan(pgDistShard,
DistShardLogicalRelidIndexId(), true,
NULL, 1, scanKey); NULL, 1, scanKey);
currentShardTuple = systable_getnext(scanDescriptor); currentShardTuple = systable_getnext(scanDescriptor);
@ -3539,7 +3760,8 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
* ShardInterval using the provided descriptor and partition type information. * ShardInterval using the provided descriptor and partition type information.
*/ */
static ShardInterval * static ShardInterval *
TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid intervalTypeId, TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid
intervalTypeId,
int32 intervalTypeMod) int32 intervalTypeMod)
{ {
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
@ -3569,7 +3791,8 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
*/ */
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]); relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid -
1]);
shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]); shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]);
storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
@ -3584,8 +3807,10 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
char *maxValueString = TextDatumGetCString(maxValueTextDatum); char *maxValueString = TextDatumGetCString(maxValueTextDatum);
/* TODO: move this up the call stack to avoid per-tuple invocation? */ /* TODO: move this up the call stack to avoid per-tuple invocation? */
get_type_io_data(intervalTypeId, IOFunc_input, &intervalTypeLen, &intervalByVal, get_type_io_data(intervalTypeId, IOFunc_input, &intervalTypeLen,
&intervalAlign, &intervalDelim, &typeIoParam, &inputFunctionId); &intervalByVal,
&intervalAlign, &intervalDelim, &typeIoParam,
&inputFunctionId);
/* finally convert min/max values to their actual types */ /* finally convert min/max values to their actual types */
minValue = OidInputFunctionCall(inputFunctionId, minValueString, minValue = OidInputFunctionCall(inputFunctionId, minValueString,
@ -3649,7 +3874,8 @@ CachedRelationLookup(const char *relationName, Oid *cachedOid)
static void static void
CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *cachedOid) CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace,
Oid *cachedOid)
{ {
/* force callbacks to be registered, so we always get notified upon changes */ /* force callbacks to be registered, so we always get notified upon changes */
InitializeCaches(); InitializeCaches();
@ -3660,8 +3886,9 @@ CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *c
if (*cachedOid == InvalidOid) if (*cachedOid == InvalidOid)
{ {
ereport(ERROR, (errmsg("cache lookup failed for %s, called too early?", ereport(ERROR, (errmsg(
relationName))); "cache lookup failed for %s, called too early?",
relationName)));
} }
} }
} }
@ -3738,8 +3965,9 @@ CitusInvalidateRelcacheByShardId(int64 shardId)
* *
* Hence we just emit a DEBUG5 message. * Hence we just emit a DEBUG5 message.
*/ */
ereport(DEBUG5, (errmsg("could not find distributed relation to invalidate for " ereport(DEBUG5, (errmsg(
"shard "INT64_FORMAT, shardId))); "could not find distributed relation to invalidate for "
"shard "INT64_FORMAT, shardId)));
} }
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
@ -3766,7 +3994,8 @@ DistNodeMetadata(void)
Relation pgDistNodeMetadata = NULL; Relation pgDistNodeMetadata = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
metadataTableOid = get_relname_relid("pg_dist_node_metadata", PG_CATALOG_NAMESPACE); metadataTableOid = get_relname_relid("pg_dist_node_metadata",
PG_CATALOG_NAMESPACE);
if (metadataTableOid == InvalidOid) if (metadataTableOid == InvalidOid)
{ {
ereport(ERROR, (errmsg("pg_dist_node_metadata was not found"))); ereport(ERROR, (errmsg("pg_dist_node_metadata was not found")));
@ -3788,7 +4017,8 @@ DistNodeMetadata(void)
} }
else else
{ {
ereport(ERROR, (errmsg("could not find any entries in pg_dist_metadata"))); ereport(ERROR, (errmsg(
"could not find any entries in pg_dist_metadata")));
} }
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
@ -3821,11 +4051,13 @@ authinfo_valid(PG_FUNCTION_ARGS)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot write to pg_dist_authinfo"), errmsg("cannot write to pg_dist_authinfo"),
errdetail("Citus Community Edition does not support the use of " errdetail(
"custom authentication options."), "Citus Community Edition does not support the use of "
errhint("To learn more about using advanced authentication schemes " "custom authentication options."),
"with Citus, please contact us at " errhint(
"https://citusdata.com/about/contact_us"))); "To learn more about using advanced authentication schemes "
"with Citus, please contact us at "
"https://citusdata.com/about/contact_us")));
} }
@ -3838,8 +4070,9 @@ poolinfo_valid(PG_FUNCTION_ARGS)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot write to pg_dist_poolinfo"), errmsg("cannot write to pg_dist_poolinfo"),
errdetail("Citus Community Edition does not support the use of " errdetail(
"pooler options."), "Citus Community Edition does not support the use of "
"pooler options."),
errhint("To learn more about using advanced pooling schemes " errhint("To learn more about using advanced pooling schemes "
"with Citus, please contact us at " "with Citus, please contact us at "
"https://citusdata.com/about/contact_us"))); "https://citusdata.com/about/contact_us")));

View File

@ -17,11 +17,18 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "tcop/dest.h"
/* cluster.c - forward declarations */ /* cluster.c - forward declarations */
extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand); extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand);
#if PG_VERSION_NUM >= 110000
/* call.c */
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, const char *callCommand,
DestReceiver *dest);
#endif /* PG_VERSION_NUM >= 110000 */
/* extension.c - forward declarations */ /* extension.c - forward declarations */
extern bool IsCitusExtensionStmt(Node *parsetree); extern bool IsCitusExtensionStmt(Node *parsetree);

View File

@ -89,6 +89,24 @@ typedef struct
int *arrayOfPlacementArrayLengths; int *arrayOfPlacementArrayLengths;
} DistTableCacheEntry; } DistTableCacheEntry;
typedef struct DistObjectCacheEntryKey
{
Oid classid;
Oid objid;
int32 objsubid;
} DistObjectCacheEntryKey;
typedef struct DistObjectCacheEntry
{
/* lookup key - must be first. */
DistObjectCacheEntryKey key;
bool isValid;
int distributionArgIndex;
int colocationId;
} DistObjectCacheEntry;
extern bool IsDistributedTable(Oid relationId); extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void); extern List * DistributedTableList(void);
@ -99,6 +117,10 @@ extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern DistObjectCacheEntry * DistributedObjectCacheEntry(Oid classid, Oid objid, int32
objsubid);
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
objsubid);
extern int32 GetLocalGroupId(void); extern int32 GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);
extern Oid LookupShardRelation(int64 shardId, bool missing_ok); extern Oid LookupShardRelation(int64 shardId, bool missing_ok);