From 3aed6c3ad07445cae772e45e6bfee00da84bf44e Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 9 Dec 2020 13:56:30 +0300 Subject: [PATCH] Rename containsOnlyLocalTable as isLocalTableModification Update error message in Modify View --- .../distributed/executor/citus_custom_scan.c | 6 ++-- .../planner/multi_physical_planner.c | 8 ++--- .../planner/multi_router_planner.c | 33 ++++++++++--------- .../distributed/utils/citus_copyfuncs.c | 2 +- .../distributed/utils/citus_outfuncs.c | 2 +- .../distributed/multi_physical_planner.h | 6 ++-- .../distributed/multi_router_planner.h | 2 +- .../regress/expected/local_table_join.out | 7 +++- src/test/regress/expected/multi_view.out | 12 +++---- src/test/regress/sql/local_table_join.sql | 8 +++-- 10 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index eb59b87a3..de809848c 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -370,7 +370,7 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) /* We skip shard related things if the job contains only local tables */ - if (!OnlyLocalTableJob(workerJob)) + if (!ModifyLocalTableJob(workerJob)) { /* * Now that we know the shard ID(s) we can acquire the necessary shard metadata @@ -544,12 +544,12 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) shardId = GetAnchorShardId(shardIntervalList); } - bool containsOnlyLocalTable = false; + bool isLocalTableModification = false; GenerateSingleShardRouterTaskList(workerJob, relationShardList, placementList, shardId, - containsOnlyLocalTable); + isLocalTableModification); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 2bf343e1c..d838fc40a 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -272,11 +272,11 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, /* - * OnlyLocalTableJob true if the given task contains - * only postgres tables + * ModifyLocalTableJob returns true if the given task contains + * a modification of local table. */ bool -OnlyLocalTableJob(Job *job) +ModifyLocalTableJob(Job *job) { if (job == NULL) { @@ -288,7 +288,7 @@ OnlyLocalTableJob(Job *job) return false; } Task *singleTask = (Task *) linitial(taskList); - return singleTask->containsOnlyLocalTable; + return singleTask->isLocalTableModification; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 000061d9c..cc39b88cd 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -171,7 +171,7 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool containsOnlyLocalTable); + bool isLocalTableModification); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -321,8 +321,6 @@ IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionCon } } - /* TODO:: we might not need this copy*/ - copyQuery = copyObject(query); RouterJob(copyQuery, plannerRestrictionContext, &deferredErrorMessage); return deferredErrorMessage == NULL; } @@ -1044,7 +1042,8 @@ DeferErrorIfModifyView(Query *queryTree) firstRangeTableElement->inFromCl == false) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot modify views over distributed tables", NULL, + "cannot modify views when the query contains citus tables", + NULL, NULL); } } @@ -1718,7 +1717,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon /* router planner should create task even if it doesn't hit a shard at all */ bool replacePrunedQueryWithDummy = true; - bool containsOnlyLocalTable = false; + bool isLocalTableModification = false; /* check if this query requires coordinator evaluation */ bool requiresCoordinatorEvaluation = RequiresCoordinatorEvaluation(originalQuery); @@ -1748,7 +1747,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon replacePrunedQueryWithDummy, &isMultiShardModifyQuery, &partitionKeyValue, - &containsOnlyLocalTable); + &isLocalTableModification); } if (*planningError) @@ -1794,7 +1793,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon else { GenerateSingleShardRouterTaskList(job, relationShardList, - placementList, shardId, containsOnlyLocalTable); + placementList, shardId, + isLocalTableModification); } job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; @@ -1811,7 +1811,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, bool - containsOnlyLocalTable) + isLocalTableModification) { Query *originalQuery = job->jobQuery; @@ -1821,7 +1821,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - containsOnlyLocalTable); + isLocalTableModification); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1838,7 +1838,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, placementList); } } - else if (shardId == INVALID_SHARD_ID && !containsOnlyLocalTable) + else if (shardId == INVALID_SHARD_ID && !isLocalTableModification) { /* modification that prunes to 0 shards */ job->taskList = NIL; @@ -1849,7 +1849,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - containsOnlyLocalTable); + isLocalTableModification); } } @@ -1943,7 +1943,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool containsOnlyLocalTable) + bool isLocalTableModification) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2002,7 +2002,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, } Task *task = CreateTask(taskType); - task->containsOnlyLocalTable = containsOnlyLocalTable; + task->isLocalTableModification = isLocalTableModification; List *relationRowLockList = NIL; RowLocksOnRelations((Node *) query, &relationRowLockList); @@ -2144,7 +2144,7 @@ PlanRouterQuery(Query *originalQuery, List **prunedShardIntervalListList, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, Const **partitionValueConst, - bool *containsOnlyLocalTable) + bool *isLocalTableModification) { bool isMultiShardQuery = false; DeferredErrorMessage *planningError = NULL; @@ -2262,7 +2262,10 @@ PlanRouterQuery(Query *originalQuery, RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery); if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties)) { - *containsOnlyLocalTable = true; + if (commandType != CMD_SELECT) + { + *isLocalTableModification = true; + } } bool hasPostgresLocalRelation = rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 763f5e910..8e725a7d0 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -327,7 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex); COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); - COPY_SCALAR_FIELD(containsOnlyLocalTable); + COPY_SCALAR_FIELD(isLocalTableModification); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index e359fe7c3..abf18aa27 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -540,7 +540,7 @@ OutTask(OUTFUNC_ARGS) WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex); WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); - WRITE_BOOL_FIELD(containsOnlyLocalTable); + WRITE_BOOL_FIELD(isLocalTableModification); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 125e7406f..6dd9153b8 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -330,9 +330,9 @@ typedef struct Task double fetchedExplainAnalyzeExecutionDuration; /* - * containsOnlyLocalTable is true if the task contains only postgres table/MV. + * isLocalTableModification is true if the task is on modifying a local table. */ - bool containsOnlyLocalTable; + bool isLocalTableModification; } Task; @@ -583,7 +583,7 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, bool modifyRequiresCoordinatorEvaluation, DeferredErrorMessage **planningError); -extern bool OnlyLocalTableJob(Job *job); +extern bool ModifyLocalTableJob(Job *job); /* function declarations for managing jobs */ extern uint64 UniqueJobId(void); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index d16ad2d3e..16e4576b3 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -87,7 +87,7 @@ extern void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, - bool containsOnlyLocalTable); + bool isLocalTableModification); extern bool IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionContext); diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 3ccd32346..3b2072709 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -813,6 +813,11 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 1 (1 row) +CREATE view loc_view AS SELECT * FROM postgres_table WHERE key > 0; +UPDATE loc_view SET key = (SELECT COUNT(*) FROM distributed_table); +DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.distributed_table +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) FROM local_table_join.postgres_table WHERE (postgres_table.key OPERATOR(pg_catalog.>) 0) +ERROR: cannot modify views when the query contains citus tables SELECT count(*) FROM (SELECT * FROM (SELECT * FROM distributed_table) d1) d2 @@ -1085,4 +1090,4 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index a7ab6a71d..926bc5fcf 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -920,11 +920,11 @@ CREATE VIEW small_view AS SELECT * from small where id < 100; \copy large FROM STDIN DELIMITER ',' -- running modify statements "on" views is still not supported, hence below two statements will fail UPDATE small_view SET id = 1; -ERROR: cannot modify views over distributed tables +ERROR: cannot modify views when the query contains citus tables DELETE FROM small_view; -ERROR: cannot modify views over distributed tables +ERROR: cannot modify views when the query contains citus tables INSERT INTO small_view VALUES(8, 5) ON CONFLICT(tenant_id) DO UPDATE SET tenant_id=99; -ERROR: cannot modify views over distributed tables +ERROR: cannot modify views when the query contains citus tables -- using views in modify statements' FROM / WHERE clauses is still valid UPDATE large SET id=20 FROM small_view WHERE small_view.id=large.id; SELECT * FROM large order by 1, 2; @@ -1054,7 +1054,7 @@ SELECT * FROM large ORDER BY 1, 2; -- INSERT INTO views is still not supported INSERT INTO small_view VALUES(3, 3); -ERROR: cannot modify views over distributed tables +ERROR: cannot modify views when the query contains citus tables DROP TABLE large; DROP TABLE small CASCADE; NOTICE: drop cascades to view small_view @@ -1081,9 +1081,9 @@ CREATE VIEW small_view AS SELECT * from small where id < 100; \copy large_partitioned FROM STDIN DELIMITER ',' -- running modify statements "on" views is still not supported, hence below two statements will fail UPDATE small_view SET id = 1; -ERROR: cannot modify views over distributed tables +ERROR: cannot modify views when the query contains citus tables DELETE FROM small_view; -ERROR: cannot modify views over distributed tables +ERROR: cannot modify views when the query contains citus tables UPDATE large_partitioned SET id=27 FROM small_view WHERE small_view.tenant_id=large_partitioned.tenant_id; SELECT * FROM large_partitioned ORDER BY 1, 2; id | tenant_id diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 40dd2aff6..475fca5d1 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -209,6 +209,8 @@ FROM WHERE d1.key = 1; +CREATE view loc_view AS SELECT * FROM postgres_table WHERE key > 0; +UPDATE loc_view SET key = (SELECT COUNT(*) FROM distributed_table); SELECT count(*) FROM @@ -260,10 +262,10 @@ EXECUTE local_dist_table_join_subquery(5); EXECUTE local_dist_table_join_subquery(5); EXECUTE local_dist_table_join_subquery(5); -PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key) +PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key) WHERE( - distributed_table_composite.key = $1 OR - distributed_table_composite.key = 20 OR + distributed_table_composite.key = $1 OR + distributed_table_composite.key = 20 OR (distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR distributed_table_composite.value = 'text' );