mirror of https://github.com/citusdata/citus.git
use address method to decide if we should run preprocess and postprocess steps for a distributed object
parent
8866d9ac32
commit
57ce4cf8c4
|
@ -45,6 +45,7 @@
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "distributed/adaptive_executor.h"
|
#include "distributed/adaptive_executor.h"
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
|
#include "distributed/citus_depended_object.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
|
@ -376,6 +377,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
{
|
{
|
||||||
Node *parsetree = pstmt->utilityStmt;
|
Node *parsetree = pstmt->utilityStmt;
|
||||||
List *ddlJobs = NIL;
|
List *ddlJobs = NIL;
|
||||||
|
bool distOpsHasInvalidObject = false;
|
||||||
|
|
||||||
if (IsA(parsetree, ExplainStmt) &&
|
if (IsA(parsetree, ExplainStmt) &&
|
||||||
IsA(((ExplainStmt *) parsetree)->query, Query))
|
IsA(((ExplainStmt *) parsetree)->query, Query))
|
||||||
|
@ -542,6 +544,16 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
parsetree = pstmt->utilityStmt;
|
parsetree = pstmt->utilityStmt;
|
||||||
ops = GetDistributeObjectOps(parsetree);
|
ops = GetDistributeObjectOps(parsetree);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Preprocess and qualify steps can cause pg tests to fail because of the
|
||||||
|
* unwanted citus related warnings or early error logs related to invalid address.
|
||||||
|
* Therefore, we first check if all addresses in the given statement are valid.
|
||||||
|
* Then, we do not execute qualify and preprocess if any address is invalid to
|
||||||
|
* prevent before-mentioned citus related messages. PG will complain about the
|
||||||
|
* invalid address, so we are safe to not execute qualify and preprocess.
|
||||||
|
*/
|
||||||
|
distOpsHasInvalidObject = DistOpsHasInvalidObject(parsetree, ops);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For some statements Citus defines a Qualify function. The goal of this function
|
* For some statements Citus defines a Qualify function. The goal of this function
|
||||||
* is to take any ambiguity from the statement that is contextual on either the
|
* is to take any ambiguity from the statement that is contextual on either the
|
||||||
|
@ -551,12 +563,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
* deserialize calls for the statement portable to other postgres servers, the
|
* deserialize calls for the statement portable to other postgres servers, the
|
||||||
* workers in our case.
|
* workers in our case.
|
||||||
*/
|
*/
|
||||||
if (ops && ops->qualify)
|
if (ops && ops->qualify && !distOpsHasInvalidObject)
|
||||||
{
|
{
|
||||||
ops->qualify(parsetree);
|
ops->qualify(parsetree);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ops && ops->preprocess)
|
if (ops && ops->preprocess && !distOpsHasInvalidObject)
|
||||||
{
|
{
|
||||||
ddlJobs = ops->preprocess(parsetree, queryString, context);
|
ddlJobs = ops->preprocess(parsetree, queryString, context);
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,3 +308,35 @@ GetCitusDependedObjectArgs(int pgMetaTableVarno, int pgMetaTableOid)
|
||||||
|
|
||||||
return list_make2((Node *) metaTableOidConst, (Node *) oidVar);
|
return list_make2((Node *) metaTableOidConst, (Node *) oidVar);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DistOpsHasInvalidObject returns true if any address in the given node
|
||||||
|
* is invalid; otherwise, returns false. If ops is null or it has no
|
||||||
|
* implemented address method, we return false.
|
||||||
|
*
|
||||||
|
* If EnableUnsupportedFeatureMessages is active, then we return false.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
DistOpsHasInvalidObject(Node *node, const DistributeObjectOps *ops)
|
||||||
|
{
|
||||||
|
if (EnableUnsupportedFeatureMessages)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ops && ops->address)
|
||||||
|
{
|
||||||
|
List *objectAddresses = ops->address(node, true);
|
||||||
|
ObjectAddress *objectAddress = NULL;
|
||||||
|
foreach_ptr(objectAddress, objectAddresses)
|
||||||
|
{
|
||||||
|
if (!OidIsValid(objectAddress->objectId))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
#ifndef CITUS_DEPENDED_OBJECT_H
|
#ifndef CITUS_DEPENDED_OBJECT_H
|
||||||
#define CITUS_DEPENDED_OBJECT_H
|
#define CITUS_DEPENDED_OBJECT_H
|
||||||
|
|
||||||
|
#include "distributed/commands.h"
|
||||||
#include "nodes/nodes.h"
|
#include "nodes/nodes.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
|
|
||||||
|
@ -22,5 +23,6 @@ extern void SetLocalClientMinMessagesIfRunningPGTests(int
|
||||||
extern void SetLocalHideCitusDependentObjectsDisabledWhenAlreadyEnabled(void);
|
extern void SetLocalHideCitusDependentObjectsDisabledWhenAlreadyEnabled(void);
|
||||||
extern bool HideCitusDependentObjectsOnQueriesOfPgMetaTables(Node *node, void *context);
|
extern bool HideCitusDependentObjectsOnQueriesOfPgMetaTables(Node *node, void *context);
|
||||||
extern bool IsPgLocksTable(RangeTblEntry *rte);
|
extern bool IsPgLocksTable(RangeTblEntry *rte);
|
||||||
|
extern bool DistOpsHasInvalidObject(Node *node, const DistributeObjectOps *ops);
|
||||||
|
|
||||||
#endif /* CITUS_DEPENDED_OBJECT_H */
|
#endif /* CITUS_DEPENDED_OBJECT_H */
|
||||||
|
|
Loading…
Reference in New Issue