mirror of https://github.com/citusdata/citus.git
Rename containsOnlyLocalTable as isLocalTableModification
Update error message in Modify Viewpull/4358/head
parent
13c43d5744
commit
3aed6c3ad0
|
@ -370,7 +370,7 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
|
|
||||||
|
|
||||||
/* We skip shard related things if the job contains only local tables */
|
/* We skip shard related things if the job contains only local tables */
|
||||||
if (!OnlyLocalTableJob(workerJob))
|
if (!ModifyLocalTableJob(workerJob))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Now that we know the shard ID(s) we can acquire the necessary shard metadata
|
* Now that we know the shard ID(s) we can acquire the necessary shard metadata
|
||||||
|
@ -544,12 +544,12 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
|
||||||
shardId = GetAnchorShardId(shardIntervalList);
|
shardId = GetAnchorShardId(shardIntervalList);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool containsOnlyLocalTable = false;
|
bool isLocalTableModification = false;
|
||||||
GenerateSingleShardRouterTaskList(workerJob,
|
GenerateSingleShardRouterTaskList(workerJob,
|
||||||
relationShardList,
|
relationShardList,
|
||||||
placementList,
|
placementList,
|
||||||
shardId,
|
shardId,
|
||||||
containsOnlyLocalTable);
|
isLocalTableModification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -272,11 +272,11 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* OnlyLocalTableJob true if the given task contains
|
* ModifyLocalTableJob returns true if the given task contains
|
||||||
* only postgres tables
|
* a modification of local table.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
OnlyLocalTableJob(Job *job)
|
ModifyLocalTableJob(Job *job)
|
||||||
{
|
{
|
||||||
if (job == NULL)
|
if (job == NULL)
|
||||||
{
|
{
|
||||||
|
@ -288,7 +288,7 @@ OnlyLocalTableJob(Job *job)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Task *singleTask = (Task *) linitial(taskList);
|
Task *singleTask = (Task *) linitial(taskList);
|
||||||
return singleTask->containsOnlyLocalTable;
|
return singleTask->isLocalTableModification;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -171,7 +171,7 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
||||||
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId, bool parametersInQueryResolved,
|
uint64 shardId, bool parametersInQueryResolved,
|
||||||
bool containsOnlyLocalTable);
|
bool isLocalTableModification);
|
||||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
TaskAssignmentPolicyType
|
TaskAssignmentPolicyType
|
||||||
|
@ -321,8 +321,6 @@ IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO:: we might not need this copy*/
|
|
||||||
copyQuery = copyObject(query);
|
|
||||||
RouterJob(copyQuery, plannerRestrictionContext, &deferredErrorMessage);
|
RouterJob(copyQuery, plannerRestrictionContext, &deferredErrorMessage);
|
||||||
return deferredErrorMessage == NULL;
|
return deferredErrorMessage == NULL;
|
||||||
}
|
}
|
||||||
|
@ -1044,7 +1042,8 @@ DeferErrorIfModifyView(Query *queryTree)
|
||||||
firstRangeTableElement->inFromCl == false)
|
firstRangeTableElement->inFromCl == false)
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
"cannot modify views over distributed tables", NULL,
|
"cannot modify views when the query contains citus tables",
|
||||||
|
NULL,
|
||||||
NULL);
|
NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1718,7 +1717,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
/* router planner should create task even if it doesn't hit a shard at all */
|
/* router planner should create task even if it doesn't hit a shard at all */
|
||||||
bool replacePrunedQueryWithDummy = true;
|
bool replacePrunedQueryWithDummy = true;
|
||||||
|
|
||||||
bool containsOnlyLocalTable = false;
|
bool isLocalTableModification = false;
|
||||||
|
|
||||||
/* check if this query requires coordinator evaluation */
|
/* check if this query requires coordinator evaluation */
|
||||||
bool requiresCoordinatorEvaluation = RequiresCoordinatorEvaluation(originalQuery);
|
bool requiresCoordinatorEvaluation = RequiresCoordinatorEvaluation(originalQuery);
|
||||||
|
@ -1748,7 +1747,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
replacePrunedQueryWithDummy,
|
replacePrunedQueryWithDummy,
|
||||||
&isMultiShardModifyQuery,
|
&isMultiShardModifyQuery,
|
||||||
&partitionKeyValue,
|
&partitionKeyValue,
|
||||||
&containsOnlyLocalTable);
|
&isLocalTableModification);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*planningError)
|
if (*planningError)
|
||||||
|
@ -1794,7 +1793,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
GenerateSingleShardRouterTaskList(job, relationShardList,
|
GenerateSingleShardRouterTaskList(job, relationShardList,
|
||||||
placementList, shardId, containsOnlyLocalTable);
|
placementList, shardId,
|
||||||
|
isLocalTableModification);
|
||||||
}
|
}
|
||||||
|
|
||||||
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
||||||
|
@ -1811,7 +1811,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
void
|
void
|
||||||
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
List *placementList, uint64 shardId, bool
|
List *placementList, uint64 shardId, bool
|
||||||
containsOnlyLocalTable)
|
isLocalTableModification)
|
||||||
{
|
{
|
||||||
Query *originalQuery = job->jobQuery;
|
Query *originalQuery = job->jobQuery;
|
||||||
|
|
||||||
|
@ -1821,7 +1821,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
containsOnlyLocalTable);
|
isLocalTableModification);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||||
|
@ -1838,7 +1838,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
placementList);
|
placementList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (shardId == INVALID_SHARD_ID && !containsOnlyLocalTable)
|
else if (shardId == INVALID_SHARD_ID && !isLocalTableModification)
|
||||||
{
|
{
|
||||||
/* modification that prunes to 0 shards */
|
/* modification that prunes to 0 shards */
|
||||||
job->taskList = NIL;
|
job->taskList = NIL;
|
||||||
|
@ -1849,7 +1849,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
containsOnlyLocalTable);
|
isLocalTableModification);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1943,7 +1943,7 @@ static List *
|
||||||
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList, uint64 shardId,
|
List *placementList, uint64 shardId,
|
||||||
bool parametersInQueryResolved,
|
bool parametersInQueryResolved,
|
||||||
bool containsOnlyLocalTable)
|
bool isLocalTableModification)
|
||||||
{
|
{
|
||||||
TaskType taskType = READ_TASK;
|
TaskType taskType = READ_TASK;
|
||||||
char replicationModel = 0;
|
char replicationModel = 0;
|
||||||
|
@ -2002,7 +2002,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
}
|
}
|
||||||
|
|
||||||
Task *task = CreateTask(taskType);
|
Task *task = CreateTask(taskType);
|
||||||
task->containsOnlyLocalTable = containsOnlyLocalTable;
|
task->isLocalTableModification = isLocalTableModification;
|
||||||
List *relationRowLockList = NIL;
|
List *relationRowLockList = NIL;
|
||||||
|
|
||||||
RowLocksOnRelations((Node *) query, &relationRowLockList);
|
RowLocksOnRelations((Node *) query, &relationRowLockList);
|
||||||
|
@ -2144,7 +2144,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
List **prunedShardIntervalListList,
|
List **prunedShardIntervalListList,
|
||||||
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
|
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
|
||||||
Const **partitionValueConst,
|
Const **partitionValueConst,
|
||||||
bool *containsOnlyLocalTable)
|
bool *isLocalTableModification)
|
||||||
{
|
{
|
||||||
bool isMultiShardQuery = false;
|
bool isMultiShardQuery = false;
|
||||||
DeferredErrorMessage *planningError = NULL;
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
@ -2262,7 +2262,10 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery);
|
RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery);
|
||||||
if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties))
|
if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties))
|
||||||
{
|
{
|
||||||
*containsOnlyLocalTable = true;
|
if (commandType != CMD_SELECT)
|
||||||
|
{
|
||||||
|
*isLocalTableModification = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
bool hasPostgresLocalRelation =
|
bool hasPostgresLocalRelation =
|
||||||
rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView;
|
rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView;
|
||||||
|
|
|
@ -327,7 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
||||||
COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex);
|
COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex);
|
||||||
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
|
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
|
||||||
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
|
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
|
||||||
COPY_SCALAR_FIELD(containsOnlyLocalTable);
|
COPY_SCALAR_FIELD(isLocalTableModification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -540,7 +540,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex);
|
WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex);
|
||||||
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
|
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
|
||||||
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
||||||
WRITE_BOOL_FIELD(containsOnlyLocalTable);
|
WRITE_BOOL_FIELD(isLocalTableModification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -330,9 +330,9 @@ typedef struct Task
|
||||||
double fetchedExplainAnalyzeExecutionDuration;
|
double fetchedExplainAnalyzeExecutionDuration;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* containsOnlyLocalTable is true if the task contains only postgres table/MV.
|
* isLocalTableModification is true if the task is on modifying a local table.
|
||||||
*/
|
*/
|
||||||
bool containsOnlyLocalTable;
|
bool isLocalTableModification;
|
||||||
} Task;
|
} Task;
|
||||||
|
|
||||||
|
|
||||||
|
@ -583,7 +583,7 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
bool modifyRequiresCoordinatorEvaluation,
|
bool modifyRequiresCoordinatorEvaluation,
|
||||||
DeferredErrorMessage **planningError);
|
DeferredErrorMessage **planningError);
|
||||||
|
|
||||||
extern bool OnlyLocalTableJob(Job *job);
|
extern bool ModifyLocalTableJob(Job *job);
|
||||||
|
|
||||||
/* function declarations for managing jobs */
|
/* function declarations for managing jobs */
|
||||||
extern uint64 UniqueJobId(void);
|
extern uint64 UniqueJobId(void);
|
||||||
|
|
|
@ -87,7 +87,7 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
|
||||||
List *relationShardList,
|
List *relationShardList,
|
||||||
List *placementList,
|
List *placementList,
|
||||||
uint64 shardId,
|
uint64 shardId,
|
||||||
bool containsOnlyLocalTable);
|
bool isLocalTableModification);
|
||||||
extern bool IsRouterPlannable(Query *query,
|
extern bool IsRouterPlannable(Query *query,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext);
|
PlannerRestrictionContext *plannerRestrictionContext);
|
||||||
|
|
||||||
|
|
|
@ -813,6 +813,11 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
CREATE view loc_view AS SELECT * FROM postgres_table WHERE key > 0;
|
||||||
|
UPDATE loc_view SET key = (SELECT COUNT(*) FROM distributed_table);
|
||||||
|
DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.distributed_table
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) FROM local_table_join.postgres_table WHERE (postgres_table.key OPERATOR(pg_catalog.>) 0)
|
||||||
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
SELECT count(*)
|
SELECT count(*)
|
||||||
FROM
|
FROM
|
||||||
(SELECT * FROM (SELECT * FROM distributed_table) d1) d2
|
(SELECT * FROM (SELECT * FROM distributed_table) d1) d2
|
||||||
|
@ -1085,4 +1090,4 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP SCHEMA local_table_join CASCADE;
|
DROP SCHEMA local_table_join CASCADE;
|
||||||
NOTICE: drop cascades to 14 other objects
|
NOTICE: drop cascades to 15 other objects
|
||||||
|
|
|
@ -920,11 +920,11 @@ CREATE VIEW small_view AS SELECT * from small where id < 100;
|
||||||
\copy large FROM STDIN DELIMITER ','
|
\copy large FROM STDIN DELIMITER ','
|
||||||
-- running modify statements "on" views is still not supported, hence below two statements will fail
|
-- running modify statements "on" views is still not supported, hence below two statements will fail
|
||||||
UPDATE small_view SET id = 1;
|
UPDATE small_view SET id = 1;
|
||||||
ERROR: cannot modify views over distributed tables
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
DELETE FROM small_view;
|
DELETE FROM small_view;
|
||||||
ERROR: cannot modify views over distributed tables
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
INSERT INTO small_view VALUES(8, 5) ON CONFLICT(tenant_id) DO UPDATE SET tenant_id=99;
|
INSERT INTO small_view VALUES(8, 5) ON CONFLICT(tenant_id) DO UPDATE SET tenant_id=99;
|
||||||
ERROR: cannot modify views over distributed tables
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
-- using views in modify statements' FROM / WHERE clauses is still valid
|
-- using views in modify statements' FROM / WHERE clauses is still valid
|
||||||
UPDATE large SET id=20 FROM small_view WHERE small_view.id=large.id;
|
UPDATE large SET id=20 FROM small_view WHERE small_view.id=large.id;
|
||||||
SELECT * FROM large order by 1, 2;
|
SELECT * FROM large order by 1, 2;
|
||||||
|
@ -1054,7 +1054,7 @@ SELECT * FROM large ORDER BY 1, 2;
|
||||||
|
|
||||||
-- INSERT INTO views is still not supported
|
-- INSERT INTO views is still not supported
|
||||||
INSERT INTO small_view VALUES(3, 3);
|
INSERT INTO small_view VALUES(3, 3);
|
||||||
ERROR: cannot modify views over distributed tables
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
DROP TABLE large;
|
DROP TABLE large;
|
||||||
DROP TABLE small CASCADE;
|
DROP TABLE small CASCADE;
|
||||||
NOTICE: drop cascades to view small_view
|
NOTICE: drop cascades to view small_view
|
||||||
|
@ -1081,9 +1081,9 @@ CREATE VIEW small_view AS SELECT * from small where id < 100;
|
||||||
\copy large_partitioned FROM STDIN DELIMITER ','
|
\copy large_partitioned FROM STDIN DELIMITER ','
|
||||||
-- running modify statements "on" views is still not supported, hence below two statements will fail
|
-- running modify statements "on" views is still not supported, hence below two statements will fail
|
||||||
UPDATE small_view SET id = 1;
|
UPDATE small_view SET id = 1;
|
||||||
ERROR: cannot modify views over distributed tables
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
DELETE FROM small_view;
|
DELETE FROM small_view;
|
||||||
ERROR: cannot modify views over distributed tables
|
ERROR: cannot modify views when the query contains citus tables
|
||||||
UPDATE large_partitioned SET id=27 FROM small_view WHERE small_view.tenant_id=large_partitioned.tenant_id;
|
UPDATE large_partitioned SET id=27 FROM small_view WHERE small_view.tenant_id=large_partitioned.tenant_id;
|
||||||
SELECT * FROM large_partitioned ORDER BY 1, 2;
|
SELECT * FROM large_partitioned ORDER BY 1, 2;
|
||||||
id | tenant_id
|
id | tenant_id
|
||||||
|
|
|
@ -209,6 +209,8 @@ FROM
|
||||||
WHERE
|
WHERE
|
||||||
d1.key = 1;
|
d1.key = 1;
|
||||||
|
|
||||||
|
CREATE view loc_view AS SELECT * FROM postgres_table WHERE key > 0;
|
||||||
|
UPDATE loc_view SET key = (SELECT COUNT(*) FROM distributed_table);
|
||||||
|
|
||||||
SELECT count(*)
|
SELECT count(*)
|
||||||
FROM
|
FROM
|
||||||
|
@ -260,10 +262,10 @@ EXECUTE local_dist_table_join_subquery(5);
|
||||||
EXECUTE local_dist_table_join_subquery(5);
|
EXECUTE local_dist_table_join_subquery(5);
|
||||||
EXECUTE local_dist_table_join_subquery(5);
|
EXECUTE local_dist_table_join_subquery(5);
|
||||||
|
|
||||||
PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key)
|
PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key)
|
||||||
WHERE(
|
WHERE(
|
||||||
distributed_table_composite.key = $1 OR
|
distributed_table_composite.key = $1 OR
|
||||||
distributed_table_composite.key = 20 OR
|
distributed_table_composite.key = 20 OR
|
||||||
(distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR
|
(distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR
|
||||||
distributed_table_composite.value = 'text'
|
distributed_table_composite.value = 'text'
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue