diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c new file mode 100644 index 000000000..d9cc3ee90 --- /dev/null +++ b/src/backend/distributed/commands/call.c @@ -0,0 +1,181 @@ +/*------------------------------------------------------------------------- + * + * call.c + * Commands for call remote stored procedures. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#if PG_VERSION_NUM >= 110000 + +#include "catalog/pg_proc.h" +#include "commands/defrem.h" +#include "distributed/colocation_utils.h" +#include "distributed/commands.h" +#include "distributed/connection_management.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/remote_commands.h" +#include "distributed/shard_pruning.h" +#include "distributed/version_compat.h" +#include "distributed/worker_manager.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/primnodes.h" +#include "miscadmin.h" +#include "tcop/dest.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + +static bool CallFuncExprRemotely(CallStmt *callStmt, + DistObjectCacheEntry *procedure, + FuncExpr *funcExpr, const char *queryString, + DestReceiver *dest); + +/* + * CallDistributedProcedure calls a stored procedure on the worker if possible. + */ +bool +CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString, + DestReceiver *dest) +{ + DistObjectCacheEntry *procedure = NULL; + FuncExpr *funcExpr = callStmt->funcexpr; + Oid functionId = funcExpr->funcid; + + procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); + if (procedure == NULL) + { + return false; + } + + return CallFuncExprRemotely(callStmt, procedure, funcExpr, queryString, dest); +} + + +/* + * CallFuncExprRemotely calls a procedure of function on the worker if possible. + */ +static bool +CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, + FuncExpr *funcExpr, const char *queryString, DestReceiver *dest) +{ + Oid colocatedRelationId = InvalidOid; + Const *partitionValue = NULL; + ShardInterval *shardInterval = NULL; + List *placementList = NIL; + ListCell *argCell = NULL; + WorkerNode *preferredWorkerNode = NULL; + DistTableCacheEntry *distTable = NULL; + ShardPlacement *placement = NULL; + WorkerNode *workerNode = NULL; + + if (IsMultiStatementTransaction()) + { + ereport(DEBUG2, (errmsg("cannot push down CALL in multi-statement transaction"))); + return false; + } + + colocatedRelationId = ColocatedTableId(procedure->colocationId); + if (colocatedRelationId == InvalidOid) + { + ereport(DEBUG2, (errmsg("stored procedure does not have co-located tables"))); + return false; + } + + if (procedure->distributionArgIndex < 0 || + procedure->distributionArgIndex >= list_length(funcExpr->args)) + { + ereport(DEBUG2, (errmsg("cannot push down invalid distribution_argument_index"))); + return false; + } + + foreach(argCell, funcExpr->args) + { + Node *argNode = (Node *) lfirst(argCell); + if (!IsA(argNode, Const)) + { + ereport(DEBUG2, (errmsg("cannot push down non-constant argument value"))); + return false; + } + } + + partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex); + distTable = DistributedTableCacheEntry(colocatedRelationId); + shardInterval = FindShardInterval(partitionValue->constvalue, distTable); + + if (shardInterval == NULL) + { + ereport(DEBUG2, (errmsg("cannot push down call, failed to find shard interval"))); + return false; + } + + placementList = FinalizedShardPlacementList(shardInterval->shardId); + if (list_length(placementList) != 1) + { + /* punt on reference tables for now */ + ereport(DEBUG2, (errmsg( + "cannot push down CALL for reference tables or replicated distributed tables"))); + return false; + } + + placement = (ShardPlacement *) linitial(placementList); + workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); + + if (workerNode->hasMetadata) + { + /* we can execute this procedure on the worker! */ + preferredWorkerNode = workerNode; + } + + if (preferredWorkerNode == NULL) + { + ereport(DEBUG2, (errmsg("there is no worker node with metadata"))); + return false; + } + + { + Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); + TupleDesc tupleDesc = CallStmtResultDesc(callStmt); + TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDesc, + &TTSOpsMinimalTuple); + + Task *task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; + task->taskId = 0; + task->taskType = DDL_TASK; + task->queryString = pstrdup(queryString); + task->replicationModel = REPLICATION_MODEL_INVALID; + task->dependedTaskList = NIL; + task->anchorShardId = placement->shardId; + task->relationShardList = NIL; + task->taskPlacementList = placementList; + + ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, list_make1(task), + tupleDesc, tupleStore, true, + MaxAdaptiveExecutorPoolSize); + + while (tuplestore_gettupleslot(tupleStore, true, false, slot)) + { + if (!dest->receiveSlot(slot, dest)) + { + break; + } + } + + /* Don't call tuplestore_end(tupleStore). It'll be freed soon enough in a top level CALL, + * & dest->receiveSlot could conceivably rely on slots being long lived. + */ + } + + return true; +} + + +#endif /* PG_VERSION_NUM >= 110000 */ diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index ac0f27904..8f2664312 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -201,6 +201,19 @@ multi_ProcessUtility(PlannedStmt *pstmt, #if (PG_VERSION_NUM >= 110000) if (IsA(parsetree, CallStmt)) { + CallStmt *callStmt = (CallStmt *) parsetree; + + /* + * If the procedure is distributed and we are using MX then we have the + * possibility of calling it on the worker. If the data is located on + * the worker this can avoid making many network round trips. + */ + if (context == PROCESS_UTILITY_TOPLEVEL && + CallDistributedProcedureRemotely(callStmt, queryString, dest)) + { + return; + } + /* * Stored procedures are a bit strange in the sense that some statements * are not in a transaction block, but can be rolled back. We need to diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index ae0423300..ca1873364 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -244,7 +244,7 @@ IsObjectDistributed(const ObjectAddress *address) ScanKeyInit(&key[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(address->objectId)); ScanKeyInit(&key[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber, F_INT4EQ, - ObjectIdGetDatum(address->objectSubId)); + Int32GetDatum(address->objectSubId)); pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(), true, NULL, 3, key); diff --git a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql index 508580ad1..32e05da72 100644 --- a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql +++ b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql @@ -32,6 +32,7 @@ COMMENT ON FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, obj IS 'remove an object address from citus.pg_dist_object once the object has been deleted'; CREATE TABLE citus.pg_dist_object ( + -- primary key classid oid NOT NULL, objid oid NOT NULL, objsubid integer NOT NULL, @@ -49,6 +50,17 @@ CREATE TABLE citus.pg_dist_object ( CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid) ); +CREATE FUNCTION master_dist_object_cache_invalidate() + RETURNS trigger + LANGUAGE C + AS 'MODULE_PATHNAME', $$master_dist_object_cache_invalidate$$; +COMMENT ON FUNCTION master_dist_object_cache_invalidate() + IS 'register relcache invalidation for changed rows'; +CREATE TRIGGER dist_object_cache_invalidate + AFTER INSERT OR UPDATE OR DELETE + ON citus.pg_dist_object + FOR EACH ROW EXECUTE PROCEDURE master_dist_object_cache_invalidate(); + #include "udfs/create_distributed_function/9.0-1.sql" #include "udfs/citus_drop_trigger/9.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.0-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.0-1.sql index 95d56300b..042f649e7 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.0-1.sql @@ -73,6 +73,7 @@ BEGIN FROM pg_catalog.pg_dist_partition p; -- restore pg_dist_object from the stable identifiers + -- DELETE/INSERT to avoid primary key violations WITH old_records AS ( DELETE FROM citus.pg_dist_object diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index 95d56300b..042f649e7 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -73,6 +73,7 @@ BEGIN FROM pg_catalog.pg_dist_partition p; -- restore pg_dist_object from the stable identifiers + -- DELETE/INSERT to avoid primary key violations WITH old_records AS ( DELETE FROM citus.pg_dist_object diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 680cf3149..b6d04e69a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -34,6 +34,7 @@ #include "distributed/function_utils.h" #include "distributed/foreign_key_relationship.h" #include "distributed/master_metadata_utility.h" +#include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/pg_dist_local_group.h" @@ -55,7 +56,6 @@ #include "parser/parse_type.h" #include "storage/lmgr.h" #include "utils/builtins.h" -#include "utils/catcache.h" #include "utils/datum.h" #include "utils/elog.h" #include "utils/hsearch.h" @@ -163,6 +163,9 @@ static HTAB *DistTableCacheHash = NULL; static HTAB *DistShardCacheHash = NULL; static MemoryContext MetadataCacheMemoryContext = NULL; +/* Hash table for information about each object */ +static HTAB *DistObjectCacheHash = NULL; + /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; static WorkerNode **WorkerNodeArray = NULL; @@ -172,9 +175,10 @@ static bool workerNodeHashValid = false; /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ static int32 LocalGroupId = -1; -/* built first time through in InitializeDistTableCache */ +/* built first time through in InitializeDistCache */ static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistShardScanKey[1]; +static ScanKeyData DistObjectScanKey[3]; /* local function forward declarations */ @@ -197,7 +201,8 @@ static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, int shardIntervalArrayLength, FmgrInfo *shardIntervalSortCompareFunction); static void InitializeCaches(void); -static void InitializeDistTableCache(void); +static void InitializeDistCache(void); +static void InitializeDistObjectCache(void); static void InitializeWorkerNodeCache(void); static void RegisterForeignKeyGraphCacheCallbacks(void); static void RegisterWorkerNodeCacheCallbacks(void); @@ -205,6 +210,7 @@ static void RegisterLocalGroupIdCacheCallbacks(void); static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void CreateDistTableCache(void); +static void CreateDistObjectCache(void); static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); @@ -224,7 +230,8 @@ static void CachedRelationNamespaceLookup(const char *relationName, Oid relnames static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); static Oid LookupEnumValueId(Oid typeId, char *valueName); -static void InvalidateEntireDistCache(void); +static void InvalidateDistTableCache(void); +static void InvalidateDistObjectCache(void); /* exports for SQL callable functions */ @@ -234,6 +241,7 @@ PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_authinfo_cache_invalidate); +PG_FUNCTION_INFO_V1(master_dist_object_cache_invalidate); PG_FUNCTION_INFO_V1(role_exists); PG_FUNCTION_INFO_V1(authinfo_valid); PG_FUNCTION_INFO_V1(poolinfo_valid); @@ -789,9 +797,8 @@ LookupShardCacheEntry(int64 shardId) DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId) { - DistTableCacheEntry *cacheEntry = NULL; - - cacheEntry = LookupDistTableCacheEntry(distributedRelationId); + DistTableCacheEntry *cacheEntry = + LookupDistTableCacheEntry(distributedRelationId); if (cacheEntry && cacheEntry->isDistributedTable) { @@ -805,6 +812,26 @@ DistributedTableCacheEntry(Oid distributedRelationId) } +/* + * DistributedProcedureRecord loads a record from citus.pg_dist_object. + */ +DistObjectCacheEntry * +DistributedObjectCacheEntry(Oid classid, Oid objid, int32 objsubid) +{ + DistObjectCacheEntry *cacheEntry = + LookupDistObjectCacheEntry(classid, objid, objsubid); + + if (cacheEntry) + { + return cacheEntry; + } + else + { + ereport(ERROR, (errmsg("object is not distributed"))); + } +} + + /* * LookupDistTableCacheEntry returns the distributed table metadata for the * passed relationId. For efficiency it caches lookups. @@ -899,6 +926,107 @@ LookupDistTableCacheEntry(Oid relationId) } +/* + * LookupDistObjectCacheEntry returns the distributed table metadata for the + * passed relationId. For efficiency it caches lookups. + */ +DistObjectCacheEntry * +LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid) +{ + DistObjectCacheEntry *cacheEntry = NULL; + bool foundInCache = false; + DistObjectCacheEntryKey hashKey; + Relation pgDistObjectRel = NULL; + TupleDesc pgDistObjectTupleDesc = NULL; + ScanKeyData pgDistObjectKey[3]; + SysScanDesc pgDistObjectScan = NULL; + HeapTuple pgDistObjectTup = NULL; + + memset(&hashKey, 0, sizeof(DistObjectCacheEntryKey)); + hashKey.classid = classid; + hashKey.objid = objid; + hashKey.objsubid = objsubid; + + /* + * Can't be a distributed relation if the extension hasn't been loaded + * yet. As we can't do lookups in nonexistent tables, directly return NULL + * here. + */ + if (!CitusHasBeenLoaded()) + { + return NULL; + } + + InitializeCaches(); + + cacheEntry = hash_search(DistObjectCacheHash, &hashKey, HASH_ENTER, &foundInCache); + + /* return valid matches */ + if (foundInCache) + { + /* + * We might have some concurrent metadata changes. In order to get the changes, + * we first need to accept the cache invalidation messages. + */ + AcceptInvalidationMessages(); + + if (cacheEntry->isValid) + { + return cacheEntry; + } + + /* this is where we'd free old entry's out of band data if it had any */ + } + + /* zero out entry, but not the key part */ + memset(((char *) cacheEntry), 0, sizeof(DistObjectCacheEntry)); + cacheEntry->key.classid = classid; + cacheEntry->key.objid = objid; + cacheEntry->key.objsubid = objsubid; + + pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock); + pgDistObjectTupleDesc = RelationGetDescr(pgDistObjectRel); + + ScanKeyInit(&pgDistObjectKey[0], Anum_pg_dist_object_classid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classid)); + ScanKeyInit(&pgDistObjectKey[1], Anum_pg_dist_object_objid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(objid)); + ScanKeyInit(&pgDistObjectKey[2], Anum_pg_dist_object_objsubid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(objsubid)); + + pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(), + true, NULL, 3, pgDistObjectKey); + pgDistObjectTup = systable_getnext(pgDistObjectScan); + + if (HeapTupleIsValid(pgDistObjectTup)) + { + Datum datumArray[Natts_pg_dist_object]; + bool isNullArray[Natts_pg_dist_object]; + + heap_deform_tuple(pgDistObjectTup, pgDistObjectTupleDesc, datumArray, + isNullArray); + + cacheEntry->isValid = true; + + cacheEntry->distributionArgIndex = + DatumGetInt32(datumArray[Anum_pg_dist_object_distribution_argument_index - + 1]); + cacheEntry->colocationId = + DatumGetInt32(datumArray[Anum_pg_dist_object_colocationid - 1]); + } + else + { + /* return NULL, cacheEntry left invalid in hash table */ + cacheEntry = NULL; + } + + systable_endscan(pgDistObjectScan); + relation_close(pgDistObjectRel, AccessShareLock); + + return cacheEntry; +} + + /* * BuildDistTableCacheEntry is a helper routine for * LookupDistTableCacheEntry() for building the cache contents. @@ -2643,6 +2771,31 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * master_dist_object_invalidate is a trigger function that performs relcache + * invalidation when the contents of pg_dist_object are changed on the SQL + * level. + * + * NB: We decided there is little point in checking permissions here, there + * are much easier ways to waste CPU than causing cache invalidations. + */ +Datum +master_dist_object_cache_invalidate(PG_FUNCTION_ARGS) +{ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("must be called as trigger"))); + } + + CheckCitusVersion(ERROR); + + CitusInvalidateRelcacheByRelid(DistObjectRelationId()); + + PG_RETURN_DATUM(PointerGetDatum(NULL)); +} + + /* * InitializeCaches() registers invalidation handlers for metadata_cache.c's * caches. @@ -2670,18 +2823,12 @@ InitializeCaches(void) /* set first, to avoid recursion dangers */ performedInitialization = true; - /* make sure we've initialized CacheMemoryContext */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } - MetadataCacheMemoryContext = AllocSetContextCreate( CacheMemoryContext, "MetadataCacheMemoryContext", ALLOCSET_DEFAULT_SIZES); - InitializeDistTableCache(); + InitializeDistCache(); RegisterForeignKeyGraphCacheCallbacks(); RegisterWorkerNodeCacheCallbacks(); RegisterLocalGroupIdCacheCallbacks(); @@ -2708,7 +2855,7 @@ InitializeCaches(void) /* initialize the infrastructure for the metadata cache */ static void -InitializeDistTableCache(void) +InitializeDistCache(void) { HASHCTL info; @@ -2733,9 +2880,10 @@ InitializeDistTableCache(void) DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; - /* initialize the per-table hash table */ CreateDistTableCache(); + InitializeDistObjectCache(); + /* initialize the per-shard hash table */ MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(int64); @@ -2752,6 +2900,40 @@ InitializeDistTableCache(void) } +static void +InitializeDistObjectCache(void) +{ + /* build initial scan keys, copied for every relation scan */ + memset(&DistObjectScanKey, 0, sizeof(DistObjectScanKey)); + + fmgr_info_cxt(F_OIDEQ, + &DistObjectScanKey[0].sk_func, + MetadataCacheMemoryContext); + DistObjectScanKey[0].sk_strategy = BTEqualStrategyNumber; + DistObjectScanKey[0].sk_subtype = InvalidOid; + DistObjectScanKey[0].sk_collation = InvalidOid; + DistObjectScanKey[0].sk_attno = Anum_pg_dist_object_classid; + + fmgr_info_cxt(F_OIDEQ, + &DistObjectScanKey[1].sk_func, + MetadataCacheMemoryContext); + DistObjectScanKey[1].sk_strategy = BTEqualStrategyNumber; + DistObjectScanKey[1].sk_subtype = InvalidOid; + DistObjectScanKey[1].sk_collation = InvalidOid; + DistObjectScanKey[1].sk_attno = Anum_pg_dist_object_objid; + + fmgr_info_cxt(F_INT4EQ, + &DistObjectScanKey[2].sk_func, + MetadataCacheMemoryContext); + DistObjectScanKey[2].sk_strategy = BTEqualStrategyNumber; + DistObjectScanKey[2].sk_subtype = InvalidOid; + DistObjectScanKey[2].sk_collation = InvalidOid; + DistObjectScanKey[2].sk_attno = Anum_pg_dist_object_objsubid; + + CreateDistObjectCache(); +} + + /* * GetWorkerNodeHash returns the worker node data as a hash with the nodename and * nodeport as a key. @@ -3141,7 +3323,7 @@ InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId) if (relationId == MetadataCache.distColocationRelationId) { SetForeignConstraintRelationshipGraphInvalid(); - InvalidateEntireDistCache(); + InvalidateDistTableCache(); } } @@ -3180,7 +3362,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) /* invalidate either entire cache or a specific entry */ if (relationId == InvalidOid) { - InvalidateEntireDistCache(); + InvalidateDistTableCache(); + InvalidateDistObjectCache(); } else { @@ -3194,25 +3377,30 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { cacheEntry->isValid = false; } - } - /* - * If pg_dist_partition is being invalidated drop all state - * This happens pretty rarely, but most importantly happens during - * DROP EXTENSION citus; - */ - if (relationId != InvalidOid && relationId == MetadataCache.distPartitionRelationId) - { - InvalidateMetadataSystemCache(); + /* + * If pg_dist_partition is being invalidated drop all state + * This happens pretty rarely, but most importantly happens during + * DROP EXTENSION citus; + */ + if (relationId == MetadataCache.distPartitionRelationId) + { + InvalidateMetadataSystemCache(); + } + + if (relationId == MetadataCache.distObjectRelationId) + { + InvalidateDistObjectCache(); + } } } /* - * InvalidateEntireDistCache makes entire cache entries invalid. + * InvalidateDistTableCache marks all DistTableCacheHash entries invalid. */ static void -InvalidateEntireDistCache(void) +InvalidateDistTableCache(void) { DistTableCacheEntry *cacheEntry = NULL; HASH_SEQ_STATUS status; @@ -3226,6 +3414,24 @@ InvalidateEntireDistCache(void) } +/* + * InvalidateDistObjectCache marks all DistObjectCacheHash entries invalid. + */ +static void +InvalidateDistObjectCache(void) +{ + DistObjectCacheEntry *cacheEntry = NULL; + HASH_SEQ_STATUS status; + + hash_seq_init(&status, DistObjectCacheHash); + + while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL) + { + cacheEntry->isValid = false; + } +} + + /* * FlushDistTableCache flushes the entire distributed relation cache, frees * all entries, and recreates the cache. @@ -3264,6 +3470,22 @@ CreateDistTableCache(void) } +/* CreateDistObjectCache initializes the per-object hash table */ +static void +CreateDistObjectCache(void) +{ + HASHCTL info; + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(DistObjectCacheEntryKey); + info.entrysize = sizeof(DistObjectCacheEntry); + info.hash = tag_hash; + info.hcxt = MetadataCacheMemoryContext; + DistObjectCacheHash = + hash_create("Distributed Object Cache", 32, &info, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); +} + + /* * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag, * and invalidates the worker node, ConnParams, and local group ID caches. @@ -3309,9 +3531,7 @@ DistTableOidList(void) Datum relationIdDatum = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid, tupleDescriptor, &isNull); - relationId = DatumGetObjectId(relationIdDatum); - distTableOidList = lappend_oid(distTableOidList, relationId); heapTuple = systable_getnext(scanDescriptor); @@ -3410,7 +3630,8 @@ LookupDistShardTuples(Oid relationId) /* set scan arguments */ scanKey[0].sk_argument = ObjectIdGetDatum(relationId); - scanDescriptor = systable_beginscan(pgDistShard, DistShardLogicalRelidIndexId(), true, + scanDescriptor = systable_beginscan(pgDistShard, + DistShardLogicalRelidIndexId(), true, NULL, 1, scanKey); currentShardTuple = systable_getnext(scanDescriptor); @@ -3539,7 +3760,8 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, * ShardInterval using the provided descriptor and partition type information. */ static ShardInterval * -TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid intervalTypeId, +TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid + intervalTypeId, int32 intervalTypeMod) { ShardInterval *shardInterval = NULL; @@ -3569,7 +3791,8 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva */ heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]); + relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - + 1]); shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]); storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; @@ -3584,8 +3807,10 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva char *maxValueString = TextDatumGetCString(maxValueTextDatum); /* TODO: move this up the call stack to avoid per-tuple invocation? */ - get_type_io_data(intervalTypeId, IOFunc_input, &intervalTypeLen, &intervalByVal, - &intervalAlign, &intervalDelim, &typeIoParam, &inputFunctionId); + get_type_io_data(intervalTypeId, IOFunc_input, &intervalTypeLen, + &intervalByVal, + &intervalAlign, &intervalDelim, &typeIoParam, + &inputFunctionId); /* finally convert min/max values to their actual types */ minValue = OidInputFunctionCall(inputFunctionId, minValueString, @@ -3649,7 +3874,8 @@ CachedRelationLookup(const char *relationName, Oid *cachedOid) static void -CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *cachedOid) +CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, + Oid *cachedOid) { /* force callbacks to be registered, so we always get notified upon changes */ InitializeCaches(); @@ -3660,8 +3886,9 @@ CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *c if (*cachedOid == InvalidOid) { - ereport(ERROR, (errmsg("cache lookup failed for %s, called too early?", - relationName))); + ereport(ERROR, (errmsg( + "cache lookup failed for %s, called too early?", + relationName))); } } } @@ -3738,8 +3965,9 @@ CitusInvalidateRelcacheByShardId(int64 shardId) * * Hence we just emit a DEBUG5 message. */ - ereport(DEBUG5, (errmsg("could not find distributed relation to invalidate for " - "shard "INT64_FORMAT, shardId))); + ereport(DEBUG5, (errmsg( + "could not find distributed relation to invalidate for " + "shard "INT64_FORMAT, shardId))); } systable_endscan(scanDescriptor); @@ -3766,7 +3994,8 @@ DistNodeMetadata(void) Relation pgDistNodeMetadata = NULL; TupleDesc tupleDescriptor = NULL; - metadataTableOid = get_relname_relid("pg_dist_node_metadata", PG_CATALOG_NAMESPACE); + metadataTableOid = get_relname_relid("pg_dist_node_metadata", + PG_CATALOG_NAMESPACE); if (metadataTableOid == InvalidOid) { ereport(ERROR, (errmsg("pg_dist_node_metadata was not found"))); @@ -3788,7 +4017,8 @@ DistNodeMetadata(void) } else { - ereport(ERROR, (errmsg("could not find any entries in pg_dist_metadata"))); + ereport(ERROR, (errmsg( + "could not find any entries in pg_dist_metadata"))); } systable_endscan(scanDescriptor); @@ -3821,11 +4051,13 @@ authinfo_valid(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot write to pg_dist_authinfo"), - errdetail("Citus Community Edition does not support the use of " - "custom authentication options."), - errhint("To learn more about using advanced authentication schemes " - "with Citus, please contact us at " - "https://citusdata.com/about/contact_us"))); + errdetail( + "Citus Community Edition does not support the use of " + "custom authentication options."), + errhint( + "To learn more about using advanced authentication schemes " + "with Citus, please contact us at " + "https://citusdata.com/about/contact_us"))); } @@ -3838,8 +4070,9 @@ poolinfo_valid(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot write to pg_dist_poolinfo"), - errdetail("Citus Community Edition does not support the use of " - "pooler options."), + errdetail( + "Citus Community Edition does not support the use of " + "pooler options."), errhint("To learn more about using advanced pooling schemes " "with Citus, please contact us at " "https://citusdata.com/about/contact_us"))); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index ca3c3b1d3..c6285f911 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -17,11 +17,18 @@ #include "utils/rel.h" #include "nodes/parsenodes.h" +#include "tcop/dest.h" /* cluster.c - forward declarations */ extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand); +#if PG_VERSION_NUM >= 110000 + +/* call.c */ +extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, const char *callCommand, + DestReceiver *dest); +#endif /* PG_VERSION_NUM >= 110000 */ /* extension.c - forward declarations */ extern bool IsCitusExtensionStmt(Node *parsetree); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 5b0b47400..2f750d7e8 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -89,6 +89,24 @@ typedef struct int *arrayOfPlacementArrayLengths; } DistTableCacheEntry; +typedef struct DistObjectCacheEntryKey +{ + Oid classid; + Oid objid; + int32 objsubid; +} DistObjectCacheEntryKey; + +typedef struct DistObjectCacheEntry +{ + /* lookup key - must be first. */ + DistObjectCacheEntryKey key; + + bool isValid; + + int distributionArgIndex; + int colocationId; +} DistObjectCacheEntry; + extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); @@ -99,6 +117,10 @@ extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId) extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); +extern DistObjectCacheEntry * DistributedObjectCacheEntry(Oid classid, Oid objid, int32 + objsubid); +extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 + objsubid); extern int32 GetLocalGroupId(void); extern List * DistTableOidList(void); extern Oid LookupShardRelation(int64 shardId, bool missing_ok);