From 432a8ef85b28eae392a8e9e7c72573367c1df7bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 19 Sep 2019 16:58:20 +0000 Subject: [PATCH] Hadi's feedback Co-authored-by: pykello Co-authored-by: serprex --- src/backend/distributed/commands/call.c | 11 ++---- .../distributed/sql/citus--8.3-1--9.0-1.sql | 2 +- .../distributed/utils/metadata_cache.c | 34 +++++++------------ src/include/distributed/metadata_cache.h | 2 -- src/include/distributed/remote_commands.h | 14 ++++---- 5 files changed, 21 insertions(+), 42 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index d9cc3ee90..86eef856d 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -39,7 +39,7 @@ static bool CallFuncExprRemotely(CallStmt *callStmt, DestReceiver *dest); /* - * CallDistributedProcedure calls a stored procedure on the worker if possible. + * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. */ bool CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString, @@ -71,7 +71,6 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ShardInterval *shardInterval = NULL; List *placementList = NIL; ListCell *argCell = NULL; - WorkerNode *preferredWorkerNode = NULL; DistTableCacheEntry *distTable = NULL; ShardPlacement *placement = NULL; WorkerNode *workerNode = NULL; @@ -128,13 +127,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, placement = (ShardPlacement *) linitial(placementList); workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); - if (workerNode->hasMetadata) - { - /* we can execute this procedure on the worker! */ - preferredWorkerNode = workerNode; - } - - if (preferredWorkerNode == NULL) + if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced) { ereport(DEBUG2, (errmsg("there is no worker node with metadata"))); return false; diff --git a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql index 32e05da72..9c552169c 100644 --- a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql +++ b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql @@ -32,7 +32,7 @@ COMMENT ON FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, obj IS 'remove an object address from citus.pg_dist_object once the object has been deleted'; CREATE TABLE citus.pg_dist_object ( - -- primary key + -- fields used for composite primary key classid oid NOT NULL, objid oid NOT NULL, objsubid integer NOT NULL, diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b6d04e69a..fa637c5e7 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -56,6 +56,7 @@ #include "parser/parse_type.h" #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/catcache.h" #include "utils/datum.h" #include "utils/elog.h" #include "utils/hsearch.h" @@ -812,26 +813,6 @@ DistributedTableCacheEntry(Oid distributedRelationId) } -/* - * DistributedProcedureRecord loads a record from citus.pg_dist_object. - */ -DistObjectCacheEntry * -DistributedObjectCacheEntry(Oid classid, Oid objid, int32 objsubid) -{ - DistObjectCacheEntry *cacheEntry = - LookupDistObjectCacheEntry(classid, objid, objsubid); - - if (cacheEntry) - { - return cacheEntry; - } - else - { - ereport(ERROR, (errmsg("object is not distributed"))); - } -} - - /* * LookupDistTableCacheEntry returns the distributed table metadata for the * passed relationId. For efficiency it caches lookups. @@ -975,7 +956,10 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid) return cacheEntry; } - /* this is where we'd free old entry's out of band data if it had any */ + /* + * This is where we'd free the old entry's out of band data if it had any. + * Right now we don't have anything to free. + */ } /* zero out entry, but not the key part */ @@ -2772,7 +2756,7 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) /* - * master_dist_object_invalidate is a trigger function that performs relcache + * master_dist_object_cache_invalidate is a trigger function that performs relcache * invalidation when the contents of pg_dist_object are changed on the SQL * level. * @@ -2823,6 +2807,12 @@ InitializeCaches(void) /* set first, to avoid recursion dangers */ performedInitialization = true; + /* make sure we've initialized CacheMemoryContext */ + if (CacheMemoryContext == NULL) + { + CreateCacheMemoryContext(); + } + MetadataCacheMemoryContext = AllocSetContextCreate( CacheMemoryContext, "MetadataCacheMemoryContext", diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 2f750d7e8..f63d26816 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -117,8 +117,6 @@ extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId) extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); -extern DistObjectCacheEntry * DistributedObjectCacheEntry(Oid classid, Oid objid, int32 - objsubid); extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid); extern int32 GetLocalGroupId(void); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index ca8c9266e..7722cc1e7 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -18,14 +18,12 @@ #define QUERY_SEND_FAILED 1 #define RESPONSE_NOT_OKAY 2 -struct pg_result; /* target of the PGresult typedef */ - /* GUC, determining whether statements sent to remote nodes are logged */ extern bool LogRemoteCommands; /* simple helpers */ -extern bool IsResponseOK(struct pg_result *result); +extern bool IsResponseOK(PGresult *result); extern void ForgetResults(MultiConnection *connection); extern bool ClearResults(MultiConnection *connection, bool raiseErrors); extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors); @@ -34,7 +32,7 @@ extern bool SqlStateMatchesCategory(char *sqlStateString, int category); /* report errors & warnings */ extern void ReportConnectionError(MultiConnection *connection, int elevel); -extern void ReportResultError(MultiConnection *connection, struct pg_result *result, +extern void ReportResultError(MultiConnection *connection, PGresult *result, int elevel); extern char * pchomp(const char *in); extern void LogRemoteCommand(MultiConnection *connection, const char *command); @@ -46,14 +44,14 @@ extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command); extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command, - struct pg_result **result); + PGresult **result); extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues); -extern List * ReadFirstColumnAsText(struct pg_result *queryResult); -extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, - bool raiseInterrupts); +extern List * ReadFirstColumnAsText(PGresult *queryResult); +extern PGresult * GetRemoteCommandResult(MultiConnection *connection, + bool raiseInterrupts); extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes); extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);