diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 52b102395..706fea00c 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -33,6 +33,7 @@ #define VACUUM_PARALLEL_NOTSET -2 + /* * Subset of VacuumParams we care about */ @@ -293,7 +294,7 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) #endif #if PG_VERSION_NUM >= PG_VERSION_13 && vacuumParams.nworkers == VACUUM_PARALLEL_NOTSET -#endif +#endif ) { return vacuumPrefix->data; @@ -351,7 +352,8 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) #endif #if PG_VERSION_NUM >= PG_VERSION_13 - if (vacuumParams.nworkers != VACUUM_PARALLEL_NOTSET) { + if (vacuumParams.nworkers != VACUUM_PARALLEL_NOTSET) + { appendStringInfo(vacuumPrefix, "PARALLEL %d,", vacuumParams.nworkers); } #endif @@ -454,7 +456,7 @@ VacuumStmtParams(VacuumStmt *vacstmt) params.index_cleanup = VACOPT_TERNARY_DEFAULT; params.truncate = VACOPT_TERNARY_DEFAULT; #if PG_VERSION_NUM >= PG_VERSION_13 - params.nworkers = VACUUM_PARALLEL_NOTSET; + params.nworkers = VACUUM_PARALLEL_NOTSET; #endif /* Parse options list */ @@ -505,8 +507,8 @@ VacuumStmtParams(VacuumStmt *vacstmt) VACOPT_TERNARY_DISABLED; } #if PG_VERSION_NUM >= PG_VERSION_13 - else if (strcmp(opt->defname, "parallel") == 0) { - + else if (strcmp(opt->defname, "parallel") == 0) + { if (opt->arg == NULL) { ereport(ERROR, @@ -516,13 +518,14 @@ VacuumStmtParams(VacuumStmt *vacstmt) } else { - int nworkers; - nworkers = defGetInt32(opt); + int nworkers = defGetInt32(opt); if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT) + { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("parallel vacuum degree must be between 0 and %d", MAX_PARALLEL_WORKER_LIMIT))); + } params.nworkers = nworkers; } diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 9203d9364..2dd90a722 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -815,9 +815,13 @@ deparse_index_columns(StringInfo buffer, List *indexParameterList, List *deparse NameListToQuotedString(indexElement->opclass)); } #if PG_VERSION_NUM >= PG_VERSION_13 - if (indexElement->opclassopts != NIL) { - ereport(ERROR, errmsg("citus currently doesn't support this index arguments")); - } + + /* Commit on postgres: 911e70207703799605f5a0e8aad9f06cff067c63*/ + if (indexElement->opclassopts != NIL) + { + ereport(ERROR, errmsg( + "citus currently doesn't support operator class parameters in indexes")); + } #endif if (indexElement->ordering != SORTBY_DEFAULT) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index e15624776..1e6b1f2c1 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -294,7 +294,7 @@ ExecuteLocalTaskListExtended(List *taskList, * implemented. So, let planner to call distributed_planner() which * eventually calls standard_planner(). */ - localPlan = planner_compat(shardQuery, NULL, cursorOptions, paramListInfo); + localPlan = planner_compat(shardQuery, cursorOptions, paramListInfo); } char *shardQueryString = NULL; @@ -334,7 +334,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple 0); int cursorOptions = 0; ParamListInfo paramListInfo = NULL; - PlannedStmt *localPlan = planner_compat(shardQuery, NULL, cursorOptions, + PlannedStmt *localPlan = planner_compat(shardQuery, cursorOptions, paramListInfo); totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, tupleDest, task, diff --git a/src/backend/distributed/planner/combine_query_planner.c b/src/backend/distributed/planner/combine_query_planner.c index 0db7d4b6f..f0f2e4860 100644 --- a/src/backend/distributed/planner/combine_query_planner.c +++ b/src/backend/distributed/planner/combine_query_planner.c @@ -295,7 +295,7 @@ BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetLis ReplaceCitusExtraDataContainer = true; ReplaceCitusExtraDataContainerWithCustomScan = remoteScan; - standardStmt = standard_planner_compat(combineQuery, NULL, 0, NULL); + standardStmt = standard_planner_compat(combineQuery, 0, NULL); ReplaceCitusExtraDataContainer = false; ReplaceCitusExtraDataContainerWithCustomScan = NULL; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 3a7f77a95..cdcac8f81 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -223,7 +223,6 @@ distributed_planner(Query *parse, * postgres' planner. */ planContext.plan = standard_planner_compat(planContext.query, - NULL, planContext.cursorOptions, planContext.boundParams); if (needsDistributedPlanning) @@ -1053,7 +1052,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi * being contiguous. */ - standard_planner_compat(newQuery, NULL, 0, boundParams); + standard_planner_compat(newQuery, 0, boundParams); /* overwrite the old transformed query with the new transformed query */ *query = *newQuery; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 67037916b..bd3501b8e 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1389,7 +1389,7 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou /* plan the subquery, this may be another distributed query */ int cursorOptions = CURSOR_OPT_PARALLEL_OK; PlannedStmt *selectPlan = pg_plan_query_compat(selectQueryCopy, NULL, cursorOptions, - boundParams); + boundParams); bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && IsSupportedRedistributionTarget(targetRelationId); diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index 380cb0952..a9b139eea 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -90,7 +90,7 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan LockRelationOid(rangeTableEntry->relid, lockMode); LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement); - localPlan = planner_compat(shardQuery, NULL, 0, NULL); + localPlan = planner_compat(shardQuery, 0, NULL); localPlannedStatement->localPlan = localPlan; localPlannedStatement->shardId = task->anchorShardId; localPlannedStatement->localGroupId = GetLocalGroupId(); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index c89e40d1c..f386f3123 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -314,8 +314,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) INSTR_TIME_SET_ZERO(planduration); - ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration, - NULL); + ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration); if (es->format == EXPLAIN_FORMAT_TEXT) { @@ -1135,7 +1134,7 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, /* run it (if needed) and produce output */ ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv, - &planduration, NULL); + &planduration); } @@ -1466,7 +1465,7 @@ ExplainOneQuery(Query *query, int cursorOptions, /* run it (if needed) and produce output */ ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv, - &planduration, NULL); + &planduration); } } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 764d89e6a..8729a4bb5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1279,19 +1279,16 @@ SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry, /* We don't have any merged columns so set it to 0 */ rangeTableEntry->joinmergedcols = 0; - Var *var = NULL; - int varId = 1; - foreach_ptr(var, leftColumnVars) + int numvars = list_length(leftColumnVars); + for (int varId = 1; varId <= numvars; varId++) { rangeTableEntry->joinleftcols = lappend_int(rangeTableEntry->joinleftcols, varId); - varId++; } - varId = 1; - foreach_ptr(var, rightColumnVars) + numvars = list_length(rightColumnVars); + for (int varId = 1; varId <= numvars; varId++) { rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols, varId); - varId++; } #endif } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index d1c07adb1..9e844a437 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1174,7 +1174,7 @@ CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery) } DistributedSubPlan *subPlan = CitusMakeNode(DistributedSubPlan); - subPlan->plan = planner_compat(subPlanQuery, NULL, cursorOptions, NULL); + subPlan->plan = planner_compat(subPlanQuery, cursorOptions, NULL); subPlan->subPlanId = subPlanId; return subPlan; diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 2ab999685..120cea563 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -50,7 +50,8 @@ partition_task_list_results(PG_FUNCTION_ARGS) bool binaryFormat = PG_GETARG_BOOL(3); Query *parsedQuery = ParseQueryString(queryString, NULL, 0); - PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString, + PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, + queryString, CURSOR_OPT_PARALLEL_OK, NULL); if (!IsCitusCustomScan(queryPlan->planTree)) @@ -123,7 +124,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) bool binaryFormat = PG_GETARG_BOOL(3); Query *parsedQuery = ParseQueryString(queryString, NULL, 0); - PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString, + PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, + queryString, CURSOR_OPT_PARALLEL_OK, NULL); if (!IsCitusCustomScan(queryPlan->planTree)) diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 62849040a..75bef4f04 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -32,14 +32,14 @@ #define lnext_compat(l, r) lnext(l, r) #define list_delete_cell_compat(l, c, p) list_delete_cell(l, c) #define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b) -#define planner_compat(p, q, c, b) planner(p, q, c, b) -#define standard_planner_compat(a, b, c, d) standard_planner(a, b, c, d) -#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, \ - CMDTAG_SELECT, e, \ - f) +#define planner_compat(p, c, b) planner(p, NULL, c, b) +#define standard_planner_compat(a, c, d) standard_planner(a, NULL, c, d) +#define PortalDefineQuerySelectCompat(a, b, c, d, e) PortalDefineQuery(a, b, c, \ + CMDTAG_SELECT, d, \ + e) #define getOwnedSequencesCompat(a, b) getOwnedSequences(a) -#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g, \ - h) +#define ExplainOnePlanCompat(a, b, c, d, e, f, g) ExplainOnePlan(a, b, c, d, e, f, g, \ + NULL) #define varoattno varattnosyn #define varnoold varnosyn #define Set_ptr_value(a, b) ((a)->ptr_value = (b)) @@ -49,12 +49,12 @@ #define lnext_compat(l, r) lnext(r) #define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p) #define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b) -#define planner_compat(p, q, c, b) planner(p, c, b) -#define standard_planner_compat(a, b, c, d) standard_planner(a, c, d) -#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, "SELECT", \ - e, f) +#define planner_compat(p, c, b) planner(p, c, b) +#define standard_planner_compat(a, c, d) standard_planner(a, c, d) +#define PortalDefineQuerySelectCompat(a, b, c, d, e) PortalDefineQuery(a, b, c, "SELECT", \ + d, e) #define getOwnedSequencesCompat(a, b) getOwnedSequences(a, b) -#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g) +#define ExplainOnePlanCompat(a, b, c, d, e, f, g) ExplainOnePlan(a, b, c, d, e, f, g) #define Set_ptr_value(a, b) ((a)->data.ptr_value = (b)) #define RangeTableEntryFromNSItem(a) (a) #define QueryCompletionCompat char diff --git a/src/test/regress/expected/multi_partitioning_1.out b/src/test/regress/expected/multi_partitioning_1.out index 18691286c..e85a4a2a9 100644 --- a/src/test/regress/expected/multi_partitioning_1.out +++ b/src/test/regress/expected/multi_partitioning_1.out @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 1660000; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; +SET citus.enable_repartition_joins to ON; -- -- Distributed Partitioned Table Creation Tests -- @@ -1298,8 +1299,6 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass (3 rows) COMMIT; --- test locks on task-tracker SELECT -SET citus.task_executor_type TO 'task-tracker'; BEGIN; SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2; id | ref_id | time | id | ref_id | time @@ -1315,7 +1314,6 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass (3 rows) COMMIT; -RESET citus.task_executor_type; -- test locks on INSERT BEGIN; INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01'); diff --git a/src/test/regress/expected/pg13.out b/src/test/regress/expected/pg13.out index db8e29f00..938604788 100644 --- a/src/test/regress/expected/pg13.out +++ b/src/test/regress/expected/pg13.out @@ -169,7 +169,7 @@ SELECT create_distributed_table('test_table', 'a'); -- we currently don't support this CREATE INDEX test_table_index ON test_table USING gist (b tsvector_ops(siglen = 100)); -ERROR: citus currently doesn't support this index arguments +ERROR: citus currently doesn't support operator class parameters in indexes drop schema test_pg13 cascade; NOTICE: drop cascades to 10 other objects DETAIL: drop cascades to table dist_table diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 6a3f0a098..47c4c5bdb 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -355,39 +355,6 @@ SELECT TRUCK | 1757 (7 rows) -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_shipmode, count(distinct l_partkey) - FROM lineitem_hash - GROUP BY l_shipmode - HAVING count(distinct l_suppkey) > 1550 - ORDER BY 1, 2 DESC; - QUERY PLAN ---------------------------------------------------------------------- - Sort - Output: remote_scan.l_shipmode, (count(DISTINCT remote_scan.count)) - Sort Key: remote_scan.l_shipmode, (count(DISTINCT remote_scan.count)) DESC - -> GroupAggregate - Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) - Group Key: remote_scan.l_shipmode - Filter: (count(DISTINCT remote_scan.worker_column_3) > 1550) - -> Sort - Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 - Sort Key: remote_scan.l_shipmode - -> Custom Scan (Citus Adaptive) - Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Query: SELECT l_shipmode, l_partkey AS count, l_suppkey AS worker_column_3 FROM lineitem_hash_240000 lineitem_hash WHERE true GROUP BY l_shipmode, l_partkey, l_suppkey - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Output: l_shipmode, l_partkey, l_suppkey - Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey, lineitem_hash.l_suppkey - -> Seq Scan on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(22 rows) - -- count distinct is supported on single table subqueries SELECT * FROM ( diff --git a/src/test/regress/sql/pg13.sql b/src/test/regress/sql/pg13.sql index 7877eb9e1..3c423c96e 100644 --- a/src/test/regress/sql/pg13.sql +++ b/src/test/regress/sql/pg13.sql @@ -94,4 +94,4 @@ SELECT create_distributed_table('test_table', 'a'); -- we currently don't support this CREATE INDEX test_table_index ON test_table USING gist (b tsvector_ops(siglen = 100)); -drop schema test_pg13 cascade; \ No newline at end of file +drop schema test_pg13 cascade;