Improve error for index operator class parameters

The error message when index has opclassopts is improved and the commit
from postgres side is also included for future reference.

Also some minor style related changes are applied.
pull/3900/head
Sait Talha Nisanci 2020-06-21 00:05:39 +03:00
parent 288aa58603
commit d68bfc5687
16 changed files with 50 additions and 81 deletions

View File

@ -33,6 +33,7 @@
#define VACUUM_PARALLEL_NOTSET -2 #define VACUUM_PARALLEL_NOTSET -2
/* /*
* Subset of VacuumParams we care about * Subset of VacuumParams we care about
*/ */
@ -293,7 +294,7 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
#endif #endif
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
&& vacuumParams.nworkers == VACUUM_PARALLEL_NOTSET && vacuumParams.nworkers == VACUUM_PARALLEL_NOTSET
#endif #endif
) )
{ {
return vacuumPrefix->data; return vacuumPrefix->data;
@ -351,7 +352,8 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
#endif #endif
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
if (vacuumParams.nworkers != VACUUM_PARALLEL_NOTSET) { if (vacuumParams.nworkers != VACUUM_PARALLEL_NOTSET)
{
appendStringInfo(vacuumPrefix, "PARALLEL %d,", vacuumParams.nworkers); appendStringInfo(vacuumPrefix, "PARALLEL %d,", vacuumParams.nworkers);
} }
#endif #endif
@ -454,7 +456,7 @@ VacuumStmtParams(VacuumStmt *vacstmt)
params.index_cleanup = VACOPT_TERNARY_DEFAULT; params.index_cleanup = VACOPT_TERNARY_DEFAULT;
params.truncate = VACOPT_TERNARY_DEFAULT; params.truncate = VACOPT_TERNARY_DEFAULT;
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
params.nworkers = VACUUM_PARALLEL_NOTSET; params.nworkers = VACUUM_PARALLEL_NOTSET;
#endif #endif
/* Parse options list */ /* Parse options list */
@ -505,8 +507,8 @@ VacuumStmtParams(VacuumStmt *vacstmt)
VACOPT_TERNARY_DISABLED; VACOPT_TERNARY_DISABLED;
} }
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
else if (strcmp(opt->defname, "parallel") == 0) { else if (strcmp(opt->defname, "parallel") == 0)
{
if (opt->arg == NULL) if (opt->arg == NULL)
{ {
ereport(ERROR, ereport(ERROR,
@ -516,13 +518,14 @@ VacuumStmtParams(VacuumStmt *vacstmt)
} }
else else
{ {
int nworkers; int nworkers = defGetInt32(opt);
nworkers = defGetInt32(opt);
if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT) if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT)
{
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("parallel vacuum degree must be between 0 and %d", errmsg("parallel vacuum degree must be between 0 and %d",
MAX_PARALLEL_WORKER_LIMIT))); MAX_PARALLEL_WORKER_LIMIT)));
}
params.nworkers = nworkers; params.nworkers = nworkers;
} }

View File

@ -815,9 +815,13 @@ deparse_index_columns(StringInfo buffer, List *indexParameterList, List *deparse
NameListToQuotedString(indexElement->opclass)); NameListToQuotedString(indexElement->opclass));
} }
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
if (indexElement->opclassopts != NIL) {
ereport(ERROR, errmsg("citus currently doesn't support this index arguments")); /* Commit on postgres: 911e70207703799605f5a0e8aad9f06cff067c63*/
} if (indexElement->opclassopts != NIL)
{
ereport(ERROR, errmsg(
"citus currently doesn't support operator class parameters in indexes"));
}
#endif #endif
if (indexElement->ordering != SORTBY_DEFAULT) if (indexElement->ordering != SORTBY_DEFAULT)

View File

@ -294,7 +294,7 @@ ExecuteLocalTaskListExtended(List *taskList,
* implemented. So, let planner to call distributed_planner() which * implemented. So, let planner to call distributed_planner() which
* eventually calls standard_planner(). * eventually calls standard_planner().
*/ */
localPlan = planner_compat(shardQuery, NULL, cursorOptions, paramListInfo); localPlan = planner_compat(shardQuery, cursorOptions, paramListInfo);
} }
char *shardQueryString = NULL; char *shardQueryString = NULL;
@ -334,7 +334,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple
0); 0);
int cursorOptions = 0; int cursorOptions = 0;
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
PlannedStmt *localPlan = planner_compat(shardQuery, NULL, cursorOptions, PlannedStmt *localPlan = planner_compat(shardQuery, cursorOptions,
paramListInfo); paramListInfo);
totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString,
tupleDest, task, tupleDest, task,

View File

@ -295,7 +295,7 @@ BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetLis
ReplaceCitusExtraDataContainer = true; ReplaceCitusExtraDataContainer = true;
ReplaceCitusExtraDataContainerWithCustomScan = remoteScan; ReplaceCitusExtraDataContainerWithCustomScan = remoteScan;
standardStmt = standard_planner_compat(combineQuery, NULL, 0, NULL); standardStmt = standard_planner_compat(combineQuery, 0, NULL);
ReplaceCitusExtraDataContainer = false; ReplaceCitusExtraDataContainer = false;
ReplaceCitusExtraDataContainerWithCustomScan = NULL; ReplaceCitusExtraDataContainerWithCustomScan = NULL;

View File

@ -223,7 +223,6 @@ distributed_planner(Query *parse,
* postgres' planner. * postgres' planner.
*/ */
planContext.plan = standard_planner_compat(planContext.query, planContext.plan = standard_planner_compat(planContext.query,
NULL,
planContext.cursorOptions, planContext.cursorOptions,
planContext.boundParams); planContext.boundParams);
if (needsDistributedPlanning) if (needsDistributedPlanning)
@ -1053,7 +1052,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
* being contiguous. * being contiguous.
*/ */
standard_planner_compat(newQuery, NULL, 0, boundParams); standard_planner_compat(newQuery, 0, boundParams);
/* overwrite the old transformed query with the new transformed query */ /* overwrite the old transformed query with the new transformed query */
*query = *newQuery; *query = *newQuery;

View File

@ -1389,7 +1389,7 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
/* plan the subquery, this may be another distributed query */ /* plan the subquery, this may be another distributed query */
int cursorOptions = CURSOR_OPT_PARALLEL_OK; int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *selectPlan = pg_plan_query_compat(selectQueryCopy, NULL, cursorOptions, PlannedStmt *selectPlan = pg_plan_query_compat(selectQueryCopy, NULL, cursorOptions,
boundParams); boundParams);
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId); IsSupportedRedistributionTarget(targetRelationId);

View File

@ -90,7 +90,7 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
LockRelationOid(rangeTableEntry->relid, lockMode); LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement); LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
localPlan = planner_compat(shardQuery, NULL, 0, NULL); localPlan = planner_compat(shardQuery, 0, NULL);
localPlannedStatement->localPlan = localPlan; localPlannedStatement->localPlan = localPlan;
localPlannedStatement->shardId = task->anchorShardId; localPlannedStatement->shardId = task->anchorShardId;
localPlannedStatement->localGroupId = GetLocalGroupId(); localPlannedStatement->localGroupId = GetLocalGroupId();

View File

@ -314,8 +314,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
INSTR_TIME_SET_ZERO(planduration); INSTR_TIME_SET_ZERO(planduration);
ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration, ExplainOnePlanCompat(plan, into, es, queryString, params, NULL, &planduration);
NULL);
if (es->format == EXPLAIN_FORMAT_TEXT) if (es->format == EXPLAIN_FORMAT_TEXT)
{ {
@ -1135,7 +1134,7 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
/* run it (if needed) and produce output */ /* run it (if needed) and produce output */
ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv, ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv,
&planduration, NULL); &planduration);
} }
@ -1466,7 +1465,7 @@ ExplainOneQuery(Query *query, int cursorOptions,
/* run it (if needed) and produce output */ /* run it (if needed) and produce output */
ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv, ExplainOnePlanCompat(plan, into, es, queryString, params, queryEnv,
&planduration, NULL); &planduration);
} }
} }

View File

@ -1279,19 +1279,16 @@ SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry,
/* We don't have any merged columns so set it to 0 */ /* We don't have any merged columns so set it to 0 */
rangeTableEntry->joinmergedcols = 0; rangeTableEntry->joinmergedcols = 0;
Var *var = NULL; int numvars = list_length(leftColumnVars);
int varId = 1; for (int varId = 1; varId <= numvars; varId++)
foreach_ptr(var, leftColumnVars)
{ {
rangeTableEntry->joinleftcols = lappend_int(rangeTableEntry->joinleftcols, varId); rangeTableEntry->joinleftcols = lappend_int(rangeTableEntry->joinleftcols, varId);
varId++;
} }
varId = 1; numvars = list_length(rightColumnVars);
foreach_ptr(var, rightColumnVars) for (int varId = 1; varId <= numvars; varId++)
{ {
rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols, rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols,
varId); varId);
varId++;
} }
#endif #endif
} }

View File

@ -1174,7 +1174,7 @@ CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery)
} }
DistributedSubPlan *subPlan = CitusMakeNode(DistributedSubPlan); DistributedSubPlan *subPlan = CitusMakeNode(DistributedSubPlan);
subPlan->plan = planner_compat(subPlanQuery, NULL, cursorOptions, NULL); subPlan->plan = planner_compat(subPlanQuery, cursorOptions, NULL);
subPlan->subPlanId = subPlanId; subPlan->subPlanId = subPlanId;
return subPlan; return subPlan;

View File

@ -50,7 +50,8 @@ partition_task_list_results(PG_FUNCTION_ARGS)
bool binaryFormat = PG_GETARG_BOOL(3); bool binaryFormat = PG_GETARG_BOOL(3);
Query *parsedQuery = ParseQueryString(queryString, NULL, 0); Query *parsedQuery = ParseQueryString(queryString, NULL, 0);
PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString, PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery,
queryString,
CURSOR_OPT_PARALLEL_OK, CURSOR_OPT_PARALLEL_OK,
NULL); NULL);
if (!IsCitusCustomScan(queryPlan->planTree)) if (!IsCitusCustomScan(queryPlan->planTree))
@ -123,7 +124,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
bool binaryFormat = PG_GETARG_BOOL(3); bool binaryFormat = PG_GETARG_BOOL(3);
Query *parsedQuery = ParseQueryString(queryString, NULL, 0); Query *parsedQuery = ParseQueryString(queryString, NULL, 0);
PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery, queryString, PlannedStmt *queryPlan = pg_plan_query_compat(parsedQuery,
queryString,
CURSOR_OPT_PARALLEL_OK, CURSOR_OPT_PARALLEL_OK,
NULL); NULL);
if (!IsCitusCustomScan(queryPlan->planTree)) if (!IsCitusCustomScan(queryPlan->planTree))

View File

@ -32,14 +32,14 @@
#define lnext_compat(l, r) lnext(l, r) #define lnext_compat(l, r) lnext(l, r)
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c) #define list_delete_cell_compat(l, c, p) list_delete_cell(l, c)
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b) #define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b)
#define planner_compat(p, q, c, b) planner(p, q, c, b) #define planner_compat(p, c, b) planner(p, NULL, c, b)
#define standard_planner_compat(a, b, c, d) standard_planner(a, b, c, d) #define standard_planner_compat(a, c, d) standard_planner(a, NULL, c, d)
#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, \ #define PortalDefineQuerySelectCompat(a, b, c, d, e) PortalDefineQuery(a, b, c, \
CMDTAG_SELECT, e, \ CMDTAG_SELECT, d, \
f) e)
#define getOwnedSequencesCompat(a, b) getOwnedSequences(a) #define getOwnedSequencesCompat(a, b) getOwnedSequences(a)
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g, \ #define ExplainOnePlanCompat(a, b, c, d, e, f, g) ExplainOnePlan(a, b, c, d, e, f, g, \
h) NULL)
#define varoattno varattnosyn #define varoattno varattnosyn
#define varnoold varnosyn #define varnoold varnosyn
#define Set_ptr_value(a, b) ((a)->ptr_value = (b)) #define Set_ptr_value(a, b) ((a)->ptr_value = (b))
@ -49,12 +49,12 @@
#define lnext_compat(l, r) lnext(r) #define lnext_compat(l, r) lnext(r)
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p) #define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p)
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b) #define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b)
#define planner_compat(p, q, c, b) planner(p, c, b) #define planner_compat(p, c, b) planner(p, c, b)
#define standard_planner_compat(a, b, c, d) standard_planner(a, c, d) #define standard_planner_compat(a, c, d) standard_planner(a, c, d)
#define PortalDefineQuerySelectCompat(a, b, c, e, f) PortalDefineQuery(a, b, c, "SELECT", \ #define PortalDefineQuerySelectCompat(a, b, c, d, e) PortalDefineQuery(a, b, c, "SELECT", \
e, f) d, e)
#define getOwnedSequencesCompat(a, b) getOwnedSequences(a, b) #define getOwnedSequencesCompat(a, b) getOwnedSequences(a, b)
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g) #define ExplainOnePlanCompat(a, b, c, d, e, f, g) ExplainOnePlan(a, b, c, d, e, f, g)
#define Set_ptr_value(a, b) ((a)->data.ptr_value = (b)) #define Set_ptr_value(a, b) ((a)->data.ptr_value = (b))
#define RangeTableEntryFromNSItem(a) (a) #define RangeTableEntryFromNSItem(a) (a)
#define QueryCompletionCompat char #define QueryCompletionCompat char

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 1660000; SET citus.next_shard_id TO 1660000;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.enable_repartition_joins to ON;
-- --
-- Distributed Partitioned Table Creation Tests -- Distributed Partitioned Table Creation Tests
-- --
@ -1298,8 +1299,6 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
(3 rows) (3 rows)
COMMIT; COMMIT;
-- test locks on task-tracker SELECT
SET citus.task_executor_type TO 'task-tracker';
BEGIN; BEGIN;
SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2; SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2;
id | ref_id | time | id | ref_id | time id | ref_id | time | id | ref_id | time
@ -1315,7 +1314,6 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
(3 rows) (3 rows)
COMMIT; COMMIT;
RESET citus.task_executor_type;
-- test locks on INSERT -- test locks on INSERT
BEGIN; BEGIN;
INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01'); INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01');

View File

@ -169,7 +169,7 @@ SELECT create_distributed_table('test_table', 'a');
-- we currently don't support this -- we currently don't support this
CREATE INDEX test_table_index ON test_table USING gist (b tsvector_ops(siglen = 100)); CREATE INDEX test_table_index ON test_table USING gist (b tsvector_ops(siglen = 100));
ERROR: citus currently doesn't support this index arguments ERROR: citus currently doesn't support operator class parameters in indexes
drop schema test_pg13 cascade; drop schema test_pg13 cascade;
NOTICE: drop cascades to 10 other objects NOTICE: drop cascades to 10 other objects
DETAIL: drop cascades to table dist_table DETAIL: drop cascades to table dist_table

View File

@ -355,39 +355,6 @@ SELECT
TRUCK | 1757 TRUCK | 1757
(7 rows) (7 rows)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
QUERY PLAN
---------------------------------------------------------------------
Sort
Output: remote_scan.l_shipmode, (count(DISTINCT remote_scan.count))
Sort Key: remote_scan.l_shipmode, (count(DISTINCT remote_scan.count)) DESC
-> GroupAggregate
Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count)
Group Key: remote_scan.l_shipmode
Filter: (count(DISTINCT remote_scan.worker_column_3) > 1550)
-> Sort
Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3
Sort Key: remote_scan.l_shipmode
-> Custom Scan (Citus Adaptive)
Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3
Task Count: 8
Tasks Shown: One of 8
-> Task
Query: SELECT l_shipmode, l_partkey AS count, l_suppkey AS worker_column_3 FROM lineitem_hash_240000 lineitem_hash WHERE true GROUP BY l_shipmode, l_partkey, l_suppkey
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: l_shipmode, l_partkey, l_suppkey
Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey, lineitem_hash.l_suppkey
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
(22 rows)
-- count distinct is supported on single table subqueries -- count distinct is supported on single table subqueries
SELECT * SELECT *
FROM ( FROM (

View File

@ -94,4 +94,4 @@ SELECT create_distributed_table('test_table', 'a');
-- we currently don't support this -- we currently don't support this
CREATE INDEX test_table_index ON test_table USING gist (b tsvector_ops(siglen = 100)); CREATE INDEX test_table_index ON test_table USING gist (b tsvector_ops(siglen = 100));
drop schema test_pg13 cascade; drop schema test_pg13 cascade;