mirror of https://github.com/citusdata/citus.git
Test
parent
e9508b2603
commit
63c1aaef1b
|
@ -181,7 +181,7 @@ EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *tupleDestinationSt
|
||||||
static TupleDesc
|
static TupleDesc
|
||||||
TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
|
TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
|
||||||
{
|
{
|
||||||
Assert(queryNumber == 0);
|
/*Assert(queryNumber == 0); */
|
||||||
|
|
||||||
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
|
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
|
||||||
|
|
||||||
|
|
|
@ -122,7 +122,6 @@ static List * GrantOnSequenceDDLCommands(Oid sequenceOid);
|
||||||
static List * GenerateGrantOnSequenceQueriesFromAclItem(Oid sequenceOid,
|
static List * GenerateGrantOnSequenceQueriesFromAclItem(Oid sequenceOid,
|
||||||
AclItem *aclItem);
|
AclItem *aclItem);
|
||||||
static void SetLocalReplicateReferenceTablesOnActivate(bool state);
|
static void SetLocalReplicateReferenceTablesOnActivate(bool state);
|
||||||
static char * GenerateSetRoleQuery(Oid roleOid);
|
|
||||||
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
|
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
|
||||||
static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
|
static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
|
||||||
|
|
||||||
|
@ -2484,7 +2483,7 @@ SetLocalReplicateReferenceTablesOnActivate(bool state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static char *
|
char *
|
||||||
GenerateSetRoleQuery(Oid roleOid)
|
GenerateSetRoleQuery(Oid roleOid)
|
||||||
{
|
{
|
||||||
StringInfo buf = makeStringInfo();
|
StringInfo buf = makeStringInfo();
|
||||||
|
|
|
@ -1156,6 +1156,28 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
*/
|
*/
|
||||||
CheckNodeIsDumpable((Node *) logicalPlan);
|
CheckNodeIsDumpable((Node *) logicalPlan);
|
||||||
|
|
||||||
|
RTEListProperties *checkForViews = GetRTEListPropertiesForQuery(originalQuery);
|
||||||
|
|
||||||
|
if (checkForViews->hasView)
|
||||||
|
{
|
||||||
|
HeapTuple tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(
|
||||||
|
checkForViews->viewId));
|
||||||
|
if (!HeapTupleIsValid(tuple))
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_UNDEFINED_TABLE),
|
||||||
|
errmsg("relation with OID %u does not exist",
|
||||||
|
checkForViews->viewId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid ownerId = ((Form_pg_class) GETSTRUCT(tuple))->relowner;
|
||||||
|
|
||||||
|
ReleaseSysCache(tuple);
|
||||||
|
|
||||||
|
plannerRestrictionContext->relationRestrictionContext->hasView = true;
|
||||||
|
plannerRestrictionContext->relationRestrictionContext->viewOwnerId = ownerId;
|
||||||
|
}
|
||||||
|
|
||||||
/* Create the physical plan */
|
/* Create the physical plan */
|
||||||
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
|
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
|
@ -2497,6 +2519,8 @@ GetRTEListProperties(List *rangeTableList)
|
||||||
* Skip over views, distributed tables within (regular) views are
|
* Skip over views, distributed tables within (regular) views are
|
||||||
* already in rangeTableList.
|
* already in rangeTableList.
|
||||||
*/
|
*/
|
||||||
|
rteListProperties->hasView = true;
|
||||||
|
rteListProperties->viewId = rangeTableEntry->relid;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/multi_logical_optimizer.h"
|
#include "distributed/multi_logical_optimizer.h"
|
||||||
|
@ -2074,6 +2075,11 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
||||||
for (int32 jobIndex = (flattenedJobCount - 1); jobIndex >= 0; jobIndex--)
|
for (int32 jobIndex = (flattenedJobCount - 1); jobIndex >= 0; jobIndex--)
|
||||||
{
|
{
|
||||||
Job *job = (Job *) list_nth(flattenedJobList, jobIndex);
|
Job *job = (Job *) list_nth(flattenedJobList, jobIndex);
|
||||||
|
if (plannerRestrictionContext->relationRestrictionContext->hasView)
|
||||||
|
{
|
||||||
|
job->userId =
|
||||||
|
plannerRestrictionContext->relationRestrictionContext->viewOwnerId;
|
||||||
|
}
|
||||||
List *sqlTaskList = NIL;
|
List *sqlTaskList = NIL;
|
||||||
ListCell *assignedSqlTaskCell = NULL;
|
ListCell *assignedSqlTaskCell = NULL;
|
||||||
|
|
||||||
|
@ -2713,6 +2719,12 @@ SqlTaskList(Job *job)
|
||||||
|
|
||||||
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, READ_TASK,
|
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, READ_TASK,
|
||||||
sqlQueryString->data);
|
sqlQueryString->data);
|
||||||
|
if (job->userId != InvalidOid)
|
||||||
|
{
|
||||||
|
List *cmds = list_make2(GenerateSetRoleQuery(job->userId),
|
||||||
|
sqlQueryString->data);
|
||||||
|
SetTaskQueryStringList(sqlTask, cmds);
|
||||||
|
}
|
||||||
sqlTask->dependentTaskList = dataFetchTaskList;
|
sqlTask->dependentTaskList = dataFetchTaskList;
|
||||||
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
|
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
|
||||||
fragmentCombination);
|
fragmentCombination);
|
||||||
|
|
|
@ -36,6 +36,8 @@ extern int PlannerLevel;
|
||||||
|
|
||||||
typedef struct RelationRestrictionContext
|
typedef struct RelationRestrictionContext
|
||||||
{
|
{
|
||||||
|
bool hasView;
|
||||||
|
Oid viewOwnerId;
|
||||||
bool allReferenceTables;
|
bool allReferenceTables;
|
||||||
List *relationRestrictionList;
|
List *relationRestrictionList;
|
||||||
} RelationRestrictionContext;
|
} RelationRestrictionContext;
|
||||||
|
@ -148,6 +150,9 @@ typedef struct RTEListProperties
|
||||||
bool hasCitusTable;
|
bool hasCitusTable;
|
||||||
|
|
||||||
bool hasMaterializedView;
|
bool hasMaterializedView;
|
||||||
|
|
||||||
|
bool hasView;
|
||||||
|
Oid viewId;
|
||||||
} RTEListProperties;
|
} RTEListProperties;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -110,6 +110,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
|
||||||
Oid distributionColumType,
|
Oid distributionColumType,
|
||||||
Oid distributionColumnCollation);
|
Oid distributionColumnCollation);
|
||||||
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
|
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
|
||||||
|
extern char * GenerateSetRoleQuery(Oid roleOid);
|
||||||
|
|
||||||
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
||||||
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
|
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
|
||||||
|
|
|
@ -136,6 +136,7 @@ typedef struct Job
|
||||||
CitusNode type;
|
CitusNode type;
|
||||||
uint64 jobId;
|
uint64 jobId;
|
||||||
Query *jobQuery;
|
Query *jobQuery;
|
||||||
|
Oid userId;
|
||||||
List *taskList;
|
List *taskList;
|
||||||
List *dependentJobList;
|
List *dependentJobList;
|
||||||
bool subqueryPushdown;
|
bool subqueryPushdown;
|
||||||
|
|
Loading…
Reference in New Issue