Hadi's feedback

Co-authored-by: pykello <hadi.moshayedi@microsoft.com>
Co-authored-by: serprex <serprex@users.noreply.github.com>
pull/3026/head
Philip Dubé 2019-09-19 16:58:20 +00:00 committed by Philip Dubé
parent 16b8d17aba
commit 432a8ef85b
5 changed files with 21 additions and 42 deletions

View File

@ -39,7 +39,7 @@ static bool CallFuncExprRemotely(CallStmt *callStmt,
DestReceiver *dest); DestReceiver *dest);
/* /*
* CallDistributedProcedure calls a stored procedure on the worker if possible. * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible.
*/ */
bool bool
CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString, CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString,
@ -71,7 +71,6 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
List *placementList = NIL; List *placementList = NIL;
ListCell *argCell = NULL; ListCell *argCell = NULL;
WorkerNode *preferredWorkerNode = NULL;
DistTableCacheEntry *distTable = NULL; DistTableCacheEntry *distTable = NULL;
ShardPlacement *placement = NULL; ShardPlacement *placement = NULL;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
@ -128,13 +127,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
placement = (ShardPlacement *) linitial(placementList); placement = (ShardPlacement *) linitial(placementList);
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
if (workerNode->hasMetadata) if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced)
{
/* we can execute this procedure on the worker! */
preferredWorkerNode = workerNode;
}
if (preferredWorkerNode == NULL)
{ {
ereport(DEBUG2, (errmsg("there is no worker node with metadata"))); ereport(DEBUG2, (errmsg("there is no worker node with metadata")));
return false; return false;

View File

@ -32,7 +32,7 @@ COMMENT ON FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, obj
IS 'remove an object address from citus.pg_dist_object once the object has been deleted'; IS 'remove an object address from citus.pg_dist_object once the object has been deleted';
CREATE TABLE citus.pg_dist_object ( CREATE TABLE citus.pg_dist_object (
-- primary key -- fields used for composite primary key
classid oid NOT NULL, classid oid NOT NULL,
objid oid NOT NULL, objid oid NOT NULL,
objsubid integer NOT NULL, objsubid integer NOT NULL,

View File

@ -56,6 +56,7 @@
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h" #include "utils/datum.h"
#include "utils/elog.h" #include "utils/elog.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -812,26 +813,6 @@ DistributedTableCacheEntry(Oid distributedRelationId)
} }
/*
* DistributedProcedureRecord loads a record from citus.pg_dist_object.
*/
DistObjectCacheEntry *
DistributedObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
{
DistObjectCacheEntry *cacheEntry =
LookupDistObjectCacheEntry(classid, objid, objsubid);
if (cacheEntry)
{
return cacheEntry;
}
else
{
ereport(ERROR, (errmsg("object is not distributed")));
}
}
/* /*
* LookupDistTableCacheEntry returns the distributed table metadata for the * LookupDistTableCacheEntry returns the distributed table metadata for the
* passed relationId. For efficiency it caches lookups. * passed relationId. For efficiency it caches lookups.
@ -975,7 +956,10 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
return cacheEntry; return cacheEntry;
} }
/* this is where we'd free old entry's out of band data if it had any */ /*
* This is where we'd free the old entry's out of band data if it had any.
* Right now we don't have anything to free.
*/
} }
/* zero out entry, but not the key part */ /* zero out entry, but not the key part */
@ -2772,7 +2756,7 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
/* /*
* master_dist_object_invalidate is a trigger function that performs relcache * master_dist_object_cache_invalidate is a trigger function that performs relcache
* invalidation when the contents of pg_dist_object are changed on the SQL * invalidation when the contents of pg_dist_object are changed on the SQL
* level. * level.
* *
@ -2823,6 +2807,12 @@ InitializeCaches(void)
/* set first, to avoid recursion dangers */ /* set first, to avoid recursion dangers */
performedInitialization = true; performedInitialization = true;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
MetadataCacheMemoryContext = AllocSetContextCreate( MetadataCacheMemoryContext = AllocSetContextCreate(
CacheMemoryContext, CacheMemoryContext,
"MetadataCacheMemoryContext", "MetadataCacheMemoryContext",

View File

@ -117,8 +117,6 @@ extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern DistObjectCacheEntry * DistributedObjectCacheEntry(Oid classid, Oid objid, int32
objsubid);
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
objsubid); objsubid);
extern int32 GetLocalGroupId(void); extern int32 GetLocalGroupId(void);

View File

@ -18,14 +18,12 @@
#define QUERY_SEND_FAILED 1 #define QUERY_SEND_FAILED 1
#define RESPONSE_NOT_OKAY 2 #define RESPONSE_NOT_OKAY 2
struct pg_result; /* target of the PGresult typedef */
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands; extern bool LogRemoteCommands;
/* simple helpers */ /* simple helpers */
extern bool IsResponseOK(struct pg_result *result); extern bool IsResponseOK(PGresult *result);
extern void ForgetResults(MultiConnection *connection); extern void ForgetResults(MultiConnection *connection);
extern bool ClearResults(MultiConnection *connection, bool raiseErrors); extern bool ClearResults(MultiConnection *connection, bool raiseErrors);
extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors); extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors);
@ -34,7 +32,7 @@ extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
/* report errors & warnings */ /* report errors & warnings */
extern void ReportConnectionError(MultiConnection *connection, int elevel); extern void ReportConnectionError(MultiConnection *connection, int elevel);
extern void ReportResultError(MultiConnection *connection, struct pg_result *result, extern void ReportResultError(MultiConnection *connection, PGresult *result,
int elevel); int elevel);
extern char * pchomp(const char *in); extern char * pchomp(const char *in);
extern void LogRemoteCommand(MultiConnection *connection, const char *command); extern void LogRemoteCommand(MultiConnection *connection, const char *command);
@ -46,13 +44,13 @@ extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
const char *command); const char *command);
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
const char *command, const char *command,
struct pg_result **result); PGresult **result);
extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes, int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues); const char *const *parameterValues);
extern List * ReadFirstColumnAsText(struct pg_result *queryResult); extern List * ReadFirstColumnAsText(PGresult *queryResult);
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, extern PGresult * GetRemoteCommandResult(MultiConnection *connection,
bool raiseInterrupts); bool raiseInterrupts);
extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer,
int nbytes); int nbytes);