diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index e1820a74e..a06f060f3 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -300,7 +300,8 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags) * The plan will be cached across executions when originalDistributedPlan * represents a prepared statement. */ - CacheLocalPlanForShardQuery(task, originalDistributedPlan); + CacheLocalPlanForShardQuery(task, originalDistributedPlan, + estate->es_param_list_info); } } @@ -414,7 +415,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) * The plan will be cached across executions when originalDistributedPlan * represents a prepared statement. */ - CacheLocalPlanForShardQuery(task, originalDistributedPlan); + CacheLocalPlanForShardQuery(task, originalDistributedPlan, + estate->es_param_list_info); } MemoryContextSwitchTo(oldContext); diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index a824573ed..1844a3246 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -128,9 +128,6 @@ static void LogLocalCommand(Task *task); static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest, Task *task); -static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, - Oid **parameterTypes, - const char ***parameterValues); static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery); static void EnsureTransitionPossible(LocalExecutionStatus from, LocalExecutionStatus to); @@ -438,7 +435,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple * value arrays. It does not change the oid of custom types, because the * query will be run locally. */ -static void +void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues) { diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 01998b029..c83170eac 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -41,8 +41,6 @@ static void AddInsertAliasIfNeeded(Query *query); static void UpdateTaskQueryString(Query *query, Task *task); -static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, - OnConflictExpr *onConflict); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); @@ -269,124 +267,6 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) } -/* - * UpdateRelationsToLocalShardTables walks over the query tree and appends shard ids to - * relations. The caller is responsible for ensuring that the resulting Query can - * be executed locally. - */ -bool -UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) -{ - if (node == NULL) - { - return false; - } - - /* want to look at all RTEs, even in subqueries, CTEs and such */ - if (IsA(node, Query)) - { - return query_tree_walker((Query *) node, UpdateRelationsToLocalShardTables, - relationShardList, QTW_EXAMINE_RTES_BEFORE); - } - - if (IsA(node, OnConflictExpr)) - { - OnConflictExpr *onConflict = (OnConflictExpr *) node; - - return ReplaceRelationConstraintByShardConstraint(relationShardList, onConflict); - } - - if (!IsA(node, RangeTblEntry)) - { - return expression_tree_walker(node, UpdateRelationsToLocalShardTables, - relationShardList); - } - - RangeTblEntry *newRte = (RangeTblEntry *) node; - - if (newRte->rtekind != RTE_RELATION) - { - return false; - } - - RelationShard *relationShard = FindRelationShard(newRte->relid, - relationShardList); - - /* the function should only be called with local shards */ - if (relationShard == NULL) - { - return true; - } - - Oid shardOid = GetTableLocalShardOid(relationShard->relationId, - relationShard->shardId); - - newRte->relid = shardOid; - - return false; -} - - -/* - * ReplaceRelationConstraintByShardConstraint replaces given OnConflictExpr's - * constraint id with constraint id of the corresponding shard. - */ -static bool -ReplaceRelationConstraintByShardConstraint(List *relationShardList, - OnConflictExpr *onConflict) -{ - Oid constraintId = onConflict->constraint; - - if (!OidIsValid(constraintId)) - { - return false; - } - - Oid constraintRelationId = InvalidOid; - - HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintId)); - if (HeapTupleIsValid(heapTuple)) - { - Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(heapTuple); - - constraintRelationId = contup->conrelid; - ReleaseSysCache(heapTuple); - } - - /* - * We can return here without calling the walker function, since we know there - * will be no possible tables or constraints after this point, by the syntax. - */ - if (!OidIsValid(constraintRelationId)) - { - ereport(ERROR, (errmsg("Invalid relation id (%u) for constraint: %s", - constraintRelationId, get_constraint_name(constraintId)))); - } - - RelationShard *relationShard = FindRelationShard(constraintRelationId, - relationShardList); - - if (relationShard != NULL) - { - char *constraintName = get_constraint_name(constraintId); - - AppendShardIdToName(&constraintName, relationShard->shardId); - - Oid shardOid = GetTableLocalShardOid(relationShard->relationId, - relationShard->shardId); - - Oid shardConstraintId = get_relation_constraint_oid(shardOid, constraintName, - false); - - onConflict->constraint = shardConstraintId; - - return false; - } - - return true; -} - - /* * FindRelationShard finds the RelationShard for shard relation with * given Oid if exists in given relationShardList. Otherwise, returns NULL. diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index 3ae83d235..f0c30a01e 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -16,19 +16,29 @@ #include "distributed/local_plan_cache.h" #include "distributed/deparse_shard_query.h" #include "distributed/citus_ruleutils.h" +#include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/version_compat.h" #include "optimizer/optimizer.h" #include "optimizer/clauses.h" +static Query * GetLocalShardQueryForCache(Query *jobQuery, Task *task, + ParamListInfo paramListInfo); +static char * DeparseLocalShardQuery(Query *jobQuery, List *relationShardList, + Oid anchorDistributedTableId, int64 anchorShardId); +static int ExtractParameterTypesForParamListInfo(ParamListInfo originalParamListInfo, + Oid **parameterTypes); + /* * CacheLocalPlanForShardQuery replaces the relation OIDs in the job query * with shard relation OIDs and then plans the query and caches the result * in the originalDistributedPlan (which may be preserved across executions). */ void -CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan) +CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan, + ParamListInfo paramListInfo) { PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan); if (localPlan != NULL) @@ -54,14 +64,14 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan * We prefer to use jobQuery (over task->query) because we don't want any * functions/params to have been evaluated in the cached plan. */ - Query *shardQuery = copyObject(originalDistributedPlan->workerJob->jobQuery); + Query *jobQuery = copyObject(originalDistributedPlan->workerJob->jobQuery); - UpdateRelationsToLocalShardTables((Node *) shardQuery, task->relationShardList); + Query *localShardQuery = GetLocalShardQueryForCache(jobQuery, task, paramListInfo); - LOCKMODE lockMode = GetQueryLockMode(shardQuery); + LOCKMODE lockMode = GetQueryLockMode(localShardQuery); /* fast path queries can only have a single RTE by definition */ - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(shardQuery->rtable); + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(localShardQuery->rtable); /* * If the shard has been created in this transction, we wouldn't see the relationId @@ -69,24 +79,16 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan */ if (rangeTableEntry->relid == InvalidOid) { - pfree(shardQuery); + pfree(jobQuery); + pfree(localShardQuery); MemoryContextSwitchTo(oldContext); return; } - if (IsLoggableLevel(DEBUG5)) - { - StringInfo queryString = makeStringInfo(); - pg_get_query_def(shardQuery, queryString); - - ereport(DEBUG5, (errmsg("caching plan for query: %s", - queryString->data))); - } - LockRelationOid(rangeTableEntry->relid, lockMode); LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement); - localPlan = planner_compat(shardQuery, 0, NULL); + localPlan = planner_compat(localShardQuery, 0, NULL); localPlannedStatement->localPlan = localPlan; localPlannedStatement->shardId = task->anchorShardId; localPlannedStatement->localGroupId = GetLocalGroupId(); @@ -99,6 +101,128 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan } +/* + * GetLocalShardQueryForCache is a helper function which generates + * the local shard query based on the jobQuery. The function should + * not be used for generic purposes, it is specialized for local cached + * queries. + * + * It is not guaranteed to have consistent attribute numbers on the shards + * and on the shell (e.g., distributed/reference tables) due to DROP COLUMN + * commands. + * + * To avoid any edge cases due to such discrepancies, we first deparse the + * jobQuery with the tables replaced to shards, and parse the query string + * back. This is normally a very expensive operation, however we only do it + * once per cached local plan, which is acceptable. + */ +static Query * +GetLocalShardQueryForCache(Query *jobQuery, Task *task, ParamListInfo orig_paramListInfo) +{ + char *shardQueryString = + DeparseLocalShardQuery(jobQuery, task->relationShardList, + task->anchorDistributedTableId, + task->anchorShardId); + ereport(DEBUG5, (errmsg("Local shard query that is going to be cached: %s", + shardQueryString))); + + Oid *parameterTypes = NULL; + int numberOfParameters = + ExtractParameterTypesForParamListInfo(orig_paramListInfo, ¶meterTypes); + + Query *localShardQuery = + ParseQueryString(shardQueryString, parameterTypes, numberOfParameters); + + return localShardQuery; +} + + +/* + * DeparseLocalShardQuery is a helper function to deparse given jobQuery for the shard(s) + * identified by the relationShardList, anchorDistributedTableId and anchorShardId. + * + * For the details and comparison with TaskQueryString(), see the comments in the function. + */ +static char * +DeparseLocalShardQuery(Query *jobQuery, List *relationShardList, Oid + anchorDistributedTableId, int64 anchorShardId) +{ + StringInfo queryString = makeStringInfo(); + + /* + * We imitate what TaskQueryString() does, but we cannot rely on that function + * as the parameters might have been already resolved on the QueryTree in the + * task. Instead, we operate on the jobQuery where are sure that the + * coordination evaluation has not happened. + * + * Local shard queries are only applicable for local cached query execution. + * In the local cached query execution mode, we can use a query structure + * (or query string) with unevaluated expressions as we allow function calls + * to be evaluated when the query on the shard is executed (e.g., do no have + * coordinator evaluation, instead let Postgres executor evaluate values). + * + * Additionally, we can allow them to be evaluated again because they are stable, + * and we do not cache plans / use unevaluated query strings for queries containing + * volatile functions. + */ + if (jobQuery->commandType == CMD_INSERT) + { + /* + * We currently do not support INSERT .. SELECT here. To support INSERT..SELECT + * queries, we should update the relation names to shard names in the SELECT + * clause (e.g., UpdateRelationToShardNames()). + */ + Assert(!CheckInsertSelectQuery(jobQuery)); + + /* + * For INSERT queries we cannot use pg_get_query_def. Mainly because we + * cannot run UpdateRelationToShardNames on an INSERT query. This is + * because the PG deparsing logic fails when trying to insert into a + * RTE_FUNCTION (which is what will happen if you call + * UpdateRelationToShardNames). + */ + deparse_shard_query(jobQuery, anchorDistributedTableId, anchorShardId, + queryString); + } + else + { + UpdateRelationToShardNames((Node *) jobQuery, relationShardList); + + pg_get_query_def(jobQuery, queryString); + } + + return queryString->data; +} + + +/* + * ExtractParameterTypesForParamListInfo is a helper function which helps to + * extract the parameter types of the given ParamListInfo via the second + * parameter of the function. + * + * The function also returns the number of parameters. If no parameter exists, + * the function returns 0. + */ +static int +ExtractParameterTypesForParamListInfo(ParamListInfo originalParamListInfo, + Oid **parameterTypes) +{ + *parameterTypes = NULL; + + int numberOfParameters = 0; + if (originalParamListInfo != NULL) + { + const char **parameterValues = NULL; + ParamListInfo paramListInfo = copyParamList(originalParamListInfo); + ExtractParametersForLocalExecution(paramListInfo, parameterTypes, + ¶meterValues); + numberOfParameters = paramListInfo->numParams; + } + + return numberOfParameters; +} + + /* * GetCachedLocalPlan is a helper function which return the cached * plan in the distributedPlan for the given task if exists. diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index f98c0d996..bdd1eb600 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -28,7 +28,7 @@ extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern char * TaskQueryString(Task *task); extern char * TaskQueryStringAtIndex(Task *task, int index); -extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); extern int GetTaskQueryType(Task *task); + #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 7a02be0f6..a47dccb17 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -46,5 +46,8 @@ extern bool TaskAccessesLocalNode(Task *task); extern void ErrorIfTransactionAccessedPlacementsLocally(void); extern void DisableLocalExecution(void); extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus); +extern void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues); #endif /* LOCAL_EXECUTION_H */ diff --git a/src/include/distributed/local_plan_cache.h b/src/include/distributed/local_plan_cache.h index ac4c503d1..510e7b706 100644 --- a/src/include/distributed/local_plan_cache.h +++ b/src/include/distributed/local_plan_cache.h @@ -5,6 +5,7 @@ extern bool IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan); extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan); extern void CacheLocalPlanForShardQuery(Task *task, - DistributedPlan *originalDistributedPlan); + DistributedPlan *originalDistributedPlan, + ParamListInfo paramListInfo); #endif /* LOCAL_PLAN_CACHE */ diff --git a/src/test/regress/expected/local_shard_execution_dropped_column.out b/src/test/regress/expected/local_shard_execution_dropped_column.out new file mode 100644 index 000000000..993321735 --- /dev/null +++ b/src/test/regress/expected/local_shard_execution_dropped_column.out @@ -0,0 +1,334 @@ +CREATE SCHEMA local_shard_execution_dropped_column; +SET search_path TO local_shard_execution_dropped_column; +SET citus.next_shard_id TO 2460000; +-- the scenario is described on https://github.com/citusdata/citus/issues/5038 +-- first stop the metadata syncing to the node do that drop column +-- is not propogated +SELECT stop_metadata_sync_to_node('localhost',:worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT stop_metadata_sync_to_node('localhost',:worker_2_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +-- create a distributed table, drop a column and sync the metadata +SET citus.shard_replication_factor TO 1; +CREATE TABLE t1 (a int, b int, c int UNIQUE); +SELECT create_distributed_table('t1', 'c'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE t1 DROP COLUMN b; +SELECT start_metadata_sync_to_node('localhost',:worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost',:worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SET search_path TO local_shard_execution_dropped_column; +-- show the dropped columns +SELECT attrelid::regclass, attname, attnum, attisdropped +FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid') +ORDER BY 1, 3, 2, 4; + attrelid | attname | attnum | attisdropped +--------------------------------------------------------------------- + t1_2460000 | a | 1 | f + t1_2460000 | ........pg.dropped.2........ | 2 | t + t1_2460000 | c | 3 | f + t1 | a | 1 | f + t1 | c | 2 | f +(5 rows) + +-- connect to a worker node where local execution is done +prepare p1(int) as insert into t1(a,c) VALUES (5,$1) ON CONFLICT (c) DO NOTHING; +SET citus.log_remote_commands TO ON; +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING +prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c; +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +prepare p3(int) as INSERT INTO t1(a,c) VALUES (5, $1), (6, $1), (7, $1),(5, $1), (6, $1), (7, $1) ON CONFLICT DO NOTHING; +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING +prepare p4(int) as UPDATE t1 SET a = a + 1 WHERE c = $1; +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +execute p4(8); +NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +\c - - - :master_port +-- one another combination is that the shell table +-- has a dropped column but not the shard, via rebalance operation +SET search_path TO local_shard_execution_dropped_column; +ALTER TABLE t1 DROP COLUMN a; +SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO local_shard_execution_dropped_column; +-- show the dropped columns +SELECT attrelid::regclass, attname, attnum, attisdropped +FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid') +ORDER BY 1, 3, 2, 4; + attrelid | attname | attnum | attisdropped +--------------------------------------------------------------------- + t1 | ........pg.dropped.1........ | 1 | t + t1 | c | 2 | f + t1_2460000 | c | 1 | f +(3 rows) + +prepare p1(int) as insert into t1(c) VALUES ($1) ON CONFLICT (c) DO NOTHING; +SET citus.log_remote_commands TO ON; +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +execute p1(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING +prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c; +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p2(8); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +prepare p3(int) as INSERT INTO t1(c) VALUES ($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1) ON CONFLICT DO NOTHING; +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +execute p3(8); +NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +\c - - - :master_port +DROP SCHEMA local_shard_execution_dropped_column CASCADE; +NOTICE: drop cascades to table local_shard_execution_dropped_column.t1 diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index ac5206d4b..172b082ad 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -54,6 +54,9 @@ test: multi_mx_insert_select_repartition test: locally_execute_intermediate_results test: multi_mx_alter_distributed_table +# should be executed sequentially because it modifies metadata +test: local_shard_execution_dropped_column + # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/sql/local_shard_execution_dropped_column.sql b/src/test/regress/sql/local_shard_execution_dropped_column.sql new file mode 100644 index 000000000..b3473a1f2 --- /dev/null +++ b/src/test/regress/sql/local_shard_execution_dropped_column.sql @@ -0,0 +1,135 @@ +CREATE SCHEMA local_shard_execution_dropped_column; +SET search_path TO local_shard_execution_dropped_column; + +SET citus.next_shard_id TO 2460000; + +-- the scenario is described on https://github.com/citusdata/citus/issues/5038 + +-- first stop the metadata syncing to the node do that drop column +-- is not propogated +SELECT stop_metadata_sync_to_node('localhost',:worker_1_port); +SELECT stop_metadata_sync_to_node('localhost',:worker_2_port); + +-- create a distributed table, drop a column and sync the metadata +SET citus.shard_replication_factor TO 1; +CREATE TABLE t1 (a int, b int, c int UNIQUE); +SELECT create_distributed_table('t1', 'c'); +ALTER TABLE t1 DROP COLUMN b; +SELECT start_metadata_sync_to_node('localhost',:worker_1_port); +SELECT start_metadata_sync_to_node('localhost',:worker_2_port); + +\c - - - :worker_1_port +SET search_path TO local_shard_execution_dropped_column; + +-- show the dropped columns +SELECT attrelid::regclass, attname, attnum, attisdropped +FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid') +ORDER BY 1, 3, 2, 4; + +-- connect to a worker node where local execution is done +prepare p1(int) as insert into t1(a,c) VALUES (5,$1) ON CONFLICT (c) DO NOTHING; +SET citus.log_remote_commands TO ON; +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); + +prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c; +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); + +prepare p3(int) as INSERT INTO t1(a,c) VALUES (5, $1), (6, $1), (7, $1),(5, $1), (6, $1), (7, $1) ON CONFLICT DO NOTHING; +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); + +prepare p4(int) as UPDATE t1 SET a = a + 1 WHERE c = $1; +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); +execute p4(8); + +\c - - - :master_port + +-- one another combination is that the shell table +-- has a dropped column but not the shard, via rebalance operation +SET search_path TO local_shard_execution_dropped_column; +ALTER TABLE t1 DROP COLUMN a; + +SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + +\c - - - :worker_2_port +SET search_path TO local_shard_execution_dropped_column; + +-- show the dropped columns +SELECT attrelid::regclass, attname, attnum, attisdropped +FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid') +ORDER BY 1, 3, 2, 4; + +prepare p1(int) as insert into t1(c) VALUES ($1) ON CONFLICT (c) DO NOTHING; +SET citus.log_remote_commands TO ON; +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); +execute p1(8); + +prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c; +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); +execute p2(8); + +prepare p3(int) as INSERT INTO t1(c) VALUES ($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1) ON CONFLICT DO NOTHING; +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); +execute p3(8); + +\c - - - :master_port +DROP SCHEMA local_shard_execution_dropped_column CASCADE;