Merge pull request #5053 from citusdata/fix_dropped_cached_plans

Deparse/parse the local cached queries
pull/5064/head
Önder Kalacı 2021-06-21 13:33:36 +03:00 committed by GitHub
commit 4e632d9da3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 623 additions and 144 deletions

View File

@ -300,7 +300,8 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
CacheLocalPlanForShardQuery(task, originalDistributedPlan);
CacheLocalPlanForShardQuery(task, originalDistributedPlan,
estate->es_param_list_info);
}
}
@ -414,7 +415,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
CacheLocalPlanForShardQuery(task, originalDistributedPlan);
CacheLocalPlanForShardQuery(task, originalDistributedPlan,
estate->es_param_list_info);
}
MemoryContextSwitchTo(oldContext);

View File

@ -128,9 +128,6 @@ static void LogLocalCommand(Task *task);
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
TupleDestination *tupleDest,
Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery);
static void EnsureTransitionPossible(LocalExecutionStatus from,
LocalExecutionStatus to);
@ -438,7 +435,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tuple
* value arrays. It does not change the oid of custom types, because the
* query will be run locally.
*/
static void
void
ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
{

View File

@ -41,8 +41,6 @@
static void AddInsertAliasIfNeeded(Query *query);
static void UpdateTaskQueryString(Query *query, Task *task);
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
OnConflictExpr *onConflict);
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task);
@ -269,124 +267,6 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
}
/*
* UpdateRelationsToLocalShardTables walks over the query tree and appends shard ids to
* relations. The caller is responsible for ensuring that the resulting Query can
* be executed locally.
*/
bool
UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
{
if (node == NULL)
{
return false;
}
/* want to look at all RTEs, even in subqueries, CTEs and such */
if (IsA(node, Query))
{
return query_tree_walker((Query *) node, UpdateRelationsToLocalShardTables,
relationShardList, QTW_EXAMINE_RTES_BEFORE);
}
if (IsA(node, OnConflictExpr))
{
OnConflictExpr *onConflict = (OnConflictExpr *) node;
return ReplaceRelationConstraintByShardConstraint(relationShardList, onConflict);
}
if (!IsA(node, RangeTblEntry))
{
return expression_tree_walker(node, UpdateRelationsToLocalShardTables,
relationShardList);
}
RangeTblEntry *newRte = (RangeTblEntry *) node;
if (newRte->rtekind != RTE_RELATION)
{
return false;
}
RelationShard *relationShard = FindRelationShard(newRte->relid,
relationShardList);
/* the function should only be called with local shards */
if (relationShard == NULL)
{
return true;
}
Oid shardOid = GetTableLocalShardOid(relationShard->relationId,
relationShard->shardId);
newRte->relid = shardOid;
return false;
}
/*
* ReplaceRelationConstraintByShardConstraint replaces given OnConflictExpr's
* constraint id with constraint id of the corresponding shard.
*/
static bool
ReplaceRelationConstraintByShardConstraint(List *relationShardList,
OnConflictExpr *onConflict)
{
Oid constraintId = onConflict->constraint;
if (!OidIsValid(constraintId))
{
return false;
}
Oid constraintRelationId = InvalidOid;
HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintId));
if (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(heapTuple);
constraintRelationId = contup->conrelid;
ReleaseSysCache(heapTuple);
}
/*
* We can return here without calling the walker function, since we know there
* will be no possible tables or constraints after this point, by the syntax.
*/
if (!OidIsValid(constraintRelationId))
{
ereport(ERROR, (errmsg("Invalid relation id (%u) for constraint: %s",
constraintRelationId, get_constraint_name(constraintId))));
}
RelationShard *relationShard = FindRelationShard(constraintRelationId,
relationShardList);
if (relationShard != NULL)
{
char *constraintName = get_constraint_name(constraintId);
AppendShardIdToName(&constraintName, relationShard->shardId);
Oid shardOid = GetTableLocalShardOid(relationShard->relationId,
relationShard->shardId);
Oid shardConstraintId = get_relation_constraint_oid(shardOid, constraintName,
false);
onConflict->constraint = shardConstraintId;
return false;
}
return true;
}
/*
* FindRelationShard finds the RelationShard for shard relation with
* given Oid if exists in given relationShardList. Otherwise, returns NULL.

View File

@ -16,19 +16,29 @@
#include "distributed/local_plan_cache.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/version_compat.h"
#include "optimizer/optimizer.h"
#include "optimizer/clauses.h"
static Query * GetLocalShardQueryForCache(Query *jobQuery, Task *task,
ParamListInfo paramListInfo);
static char * DeparseLocalShardQuery(Query *jobQuery, List *relationShardList,
Oid anchorDistributedTableId, int64 anchorShardId);
static int ExtractParameterTypesForParamListInfo(ParamListInfo originalParamListInfo,
Oid **parameterTypes);
/*
* CacheLocalPlanForShardQuery replaces the relation OIDs in the job query
* with shard relation OIDs and then plans the query and caches the result
* in the originalDistributedPlan (which may be preserved across executions).
*/
void
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan)
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan,
ParamListInfo paramListInfo)
{
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
if (localPlan != NULL)
@ -54,14 +64,14 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
* We prefer to use jobQuery (over task->query) because we don't want any
* functions/params to have been evaluated in the cached plan.
*/
Query *shardQuery = copyObject(originalDistributedPlan->workerJob->jobQuery);
Query *jobQuery = copyObject(originalDistributedPlan->workerJob->jobQuery);
UpdateRelationsToLocalShardTables((Node *) shardQuery, task->relationShardList);
Query *localShardQuery = GetLocalShardQueryForCache(jobQuery, task, paramListInfo);
LOCKMODE lockMode = GetQueryLockMode(shardQuery);
LOCKMODE lockMode = GetQueryLockMode(localShardQuery);
/* fast path queries can only have a single RTE by definition */
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(shardQuery->rtable);
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(localShardQuery->rtable);
/*
* If the shard has been created in this transction, we wouldn't see the relationId
@ -69,24 +79,16 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
*/
if (rangeTableEntry->relid == InvalidOid)
{
pfree(shardQuery);
pfree(jobQuery);
pfree(localShardQuery);
MemoryContextSwitchTo(oldContext);
return;
}
if (IsLoggableLevel(DEBUG5))
{
StringInfo queryString = makeStringInfo();
pg_get_query_def(shardQuery, queryString);
ereport(DEBUG5, (errmsg("caching plan for query: %s",
queryString->data)));
}
LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
localPlan = planner_compat(shardQuery, 0, NULL);
localPlan = planner_compat(localShardQuery, 0, NULL);
localPlannedStatement->localPlan = localPlan;
localPlannedStatement->shardId = task->anchorShardId;
localPlannedStatement->localGroupId = GetLocalGroupId();
@ -99,6 +101,128 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
}
/*
* GetLocalShardQueryForCache is a helper function which generates
* the local shard query based on the jobQuery. The function should
* not be used for generic purposes, it is specialized for local cached
* queries.
*
* It is not guaranteed to have consistent attribute numbers on the shards
* and on the shell (e.g., distributed/reference tables) due to DROP COLUMN
* commands.
*
* To avoid any edge cases due to such discrepancies, we first deparse the
* jobQuery with the tables replaced to shards, and parse the query string
* back. This is normally a very expensive operation, however we only do it
* once per cached local plan, which is acceptable.
*/
static Query *
GetLocalShardQueryForCache(Query *jobQuery, Task *task, ParamListInfo orig_paramListInfo)
{
char *shardQueryString =
DeparseLocalShardQuery(jobQuery, task->relationShardList,
task->anchorDistributedTableId,
task->anchorShardId);
ereport(DEBUG5, (errmsg("Local shard query that is going to be cached: %s",
shardQueryString)));
Oid *parameterTypes = NULL;
int numberOfParameters =
ExtractParameterTypesForParamListInfo(orig_paramListInfo, &parameterTypes);
Query *localShardQuery =
ParseQueryString(shardQueryString, parameterTypes, numberOfParameters);
return localShardQuery;
}
/*
* DeparseLocalShardQuery is a helper function to deparse given jobQuery for the shard(s)
* identified by the relationShardList, anchorDistributedTableId and anchorShardId.
*
* For the details and comparison with TaskQueryString(), see the comments in the function.
*/
static char *
DeparseLocalShardQuery(Query *jobQuery, List *relationShardList, Oid
anchorDistributedTableId, int64 anchorShardId)
{
StringInfo queryString = makeStringInfo();
/*
* We imitate what TaskQueryString() does, but we cannot rely on that function
* as the parameters might have been already resolved on the QueryTree in the
* task. Instead, we operate on the jobQuery where are sure that the
* coordination evaluation has not happened.
*
* Local shard queries are only applicable for local cached query execution.
* In the local cached query execution mode, we can use a query structure
* (or query string) with unevaluated expressions as we allow function calls
* to be evaluated when the query on the shard is executed (e.g., do no have
* coordinator evaluation, instead let Postgres executor evaluate values).
*
* Additionally, we can allow them to be evaluated again because they are stable,
* and we do not cache plans / use unevaluated query strings for queries containing
* volatile functions.
*/
if (jobQuery->commandType == CMD_INSERT)
{
/*
* We currently do not support INSERT .. SELECT here. To support INSERT..SELECT
* queries, we should update the relation names to shard names in the SELECT
* clause (e.g., UpdateRelationToShardNames()).
*/
Assert(!CheckInsertSelectQuery(jobQuery));
/*
* For INSERT queries we cannot use pg_get_query_def. Mainly because we
* cannot run UpdateRelationToShardNames on an INSERT query. This is
* because the PG deparsing logic fails when trying to insert into a
* RTE_FUNCTION (which is what will happen if you call
* UpdateRelationToShardNames).
*/
deparse_shard_query(jobQuery, anchorDistributedTableId, anchorShardId,
queryString);
}
else
{
UpdateRelationToShardNames((Node *) jobQuery, relationShardList);
pg_get_query_def(jobQuery, queryString);
}
return queryString->data;
}
/*
* ExtractParameterTypesForParamListInfo is a helper function which helps to
* extract the parameter types of the given ParamListInfo via the second
* parameter of the function.
*
* The function also returns the number of parameters. If no parameter exists,
* the function returns 0.
*/
static int
ExtractParameterTypesForParamListInfo(ParamListInfo originalParamListInfo,
Oid **parameterTypes)
{
*parameterTypes = NULL;
int numberOfParameters = 0;
if (originalParamListInfo != NULL)
{
const char **parameterValues = NULL;
ParamListInfo paramListInfo = copyParamList(originalParamListInfo);
ExtractParametersForLocalExecution(paramListInfo, parameterTypes,
&parameterValues);
numberOfParameters = paramListInfo->numParams;
}
return numberOfParameters;
}
/*
* GetCachedLocalPlan is a helper function which return the cached
* plan in the distributedPlan for the given task if exists.

View File

@ -28,7 +28,7 @@ extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern char * TaskQueryString(Task *task);
extern char * TaskQueryStringAtIndex(Task *task, int index);
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
extern int GetTaskQueryType(Task *task);
#endif /* DEPARSE_SHARD_QUERY_H */

View File

@ -46,5 +46,8 @@ extern bool TaskAccessesLocalNode(Task *task);
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
extern void DisableLocalExecution(void);
extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus);
extern void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
#endif /* LOCAL_EXECUTION_H */

View File

@ -5,6 +5,7 @@ extern bool IsLocalPlanCachingSupported(Job *currentJob,
DistributedPlan *originalDistributedPlan);
extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan);
extern void CacheLocalPlanForShardQuery(Task *task,
DistributedPlan *originalDistributedPlan);
DistributedPlan *originalDistributedPlan,
ParamListInfo paramListInfo);
#endif /* LOCAL_PLAN_CACHE */

View File

@ -0,0 +1,334 @@
CREATE SCHEMA local_shard_execution_dropped_column;
SET search_path TO local_shard_execution_dropped_column;
SET citus.next_shard_id TO 2460000;
-- the scenario is described on https://github.com/citusdata/citus/issues/5038
-- first stop the metadata syncing to the node do that drop column
-- is not propogated
SELECT stop_metadata_sync_to_node('localhost',:worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost',:worker_2_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- create a distributed table, drop a column and sync the metadata
SET citus.shard_replication_factor TO 1;
CREATE TABLE t1 (a int, b int, c int UNIQUE);
SELECT create_distributed_table('t1', 'c');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE t1 DROP COLUMN b;
SELECT start_metadata_sync_to_node('localhost',:worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost',:worker_2_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO local_shard_execution_dropped_column;
-- show the dropped columns
SELECT attrelid::regclass, attname, attnum, attisdropped
FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid')
ORDER BY 1, 3, 2, 4;
attrelid | attname | attnum | attisdropped
---------------------------------------------------------------------
t1_2460000 | a | 1 | f
t1_2460000 | ........pg.dropped.2........ | 2 | t
t1_2460000 | c | 3 | f
t1 | a | 1 | f
t1 | c | 2 | f
(5 rows)
-- connect to a worker node where local execution is done
prepare p1(int) as insert into t1(a,c) VALUES (5,$1) ON CONFLICT (c) DO NOTHING;
SET citus.log_remote_commands TO ON;
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5, 8) ON CONFLICT(c) DO NOTHING
prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c;
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
prepare p3(int) as INSERT INTO t1(a,c) VALUES (5, $1), (6, $1), (7, $1),(5, $1), (6, $1), (7, $1) ON CONFLICT DO NOTHING;
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (a, c) VALUES (5,8), (6,8), (7,8), (5,8), (6,8), (7,8) ON CONFLICT DO NOTHING
prepare p4(int) as UPDATE t1 SET a = a + 1 WHERE c = $1;
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
\c - - - :master_port
-- one another combination is that the shell table
-- has a dropped column but not the shard, via rebalance operation
SET search_path TO local_shard_execution_dropped_column;
ALTER TABLE t1 DROP COLUMN a;
SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO local_shard_execution_dropped_column;
-- show the dropped columns
SELECT attrelid::regclass, attname, attnum, attisdropped
FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid')
ORDER BY 1, 3, 2, 4;
attrelid | attname | attnum | attisdropped
---------------------------------------------------------------------
t1 | ........pg.dropped.1........ | 1 | t
t1 | c | 2 | f
t1_2460000 | c | 1 | f
(3 rows)
prepare p1(int) as insert into t1(c) VALUES ($1) ON CONFLICT (c) DO NOTHING;
SET citus.log_remote_commands TO ON;
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
execute p1(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8) ON CONFLICT(c) DO NOTHING
prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c;
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p2(8);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE (c OPERATOR(pg_catalog.=) 8) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
prepare p3(int) as INSERT INTO t1(c) VALUES ($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1) ON CONFLICT DO NOTHING;
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
\c - - - :master_port
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
NOTICE: drop cascades to table local_shard_execution_dropped_column.t1

View File

@ -54,6 +54,9 @@ test: multi_mx_insert_select_repartition
test: locally_execute_intermediate_results
test: multi_mx_alter_distributed_table
# should be executed sequentially because it modifies metadata
test: local_shard_execution_dropped_column
# test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak

View File

@ -0,0 +1,135 @@
CREATE SCHEMA local_shard_execution_dropped_column;
SET search_path TO local_shard_execution_dropped_column;
SET citus.next_shard_id TO 2460000;
-- the scenario is described on https://github.com/citusdata/citus/issues/5038
-- first stop the metadata syncing to the node do that drop column
-- is not propogated
SELECT stop_metadata_sync_to_node('localhost',:worker_1_port);
SELECT stop_metadata_sync_to_node('localhost',:worker_2_port);
-- create a distributed table, drop a column and sync the metadata
SET citus.shard_replication_factor TO 1;
CREATE TABLE t1 (a int, b int, c int UNIQUE);
SELECT create_distributed_table('t1', 'c');
ALTER TABLE t1 DROP COLUMN b;
SELECT start_metadata_sync_to_node('localhost',:worker_1_port);
SELECT start_metadata_sync_to_node('localhost',:worker_2_port);
\c - - - :worker_1_port
SET search_path TO local_shard_execution_dropped_column;
-- show the dropped columns
SELECT attrelid::regclass, attname, attnum, attisdropped
FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid')
ORDER BY 1, 3, 2, 4;
-- connect to a worker node where local execution is done
prepare p1(int) as insert into t1(a,c) VALUES (5,$1) ON CONFLICT (c) DO NOTHING;
SET citus.log_remote_commands TO ON;
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c;
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
prepare p3(int) as INSERT INTO t1(a,c) VALUES (5, $1), (6, $1), (7, $1),(5, $1), (6, $1), (7, $1) ON CONFLICT DO NOTHING;
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
prepare p4(int) as UPDATE t1 SET a = a + 1 WHERE c = $1;
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
execute p4(8);
\c - - - :master_port
-- one another combination is that the shell table
-- has a dropped column but not the shard, via rebalance operation
SET search_path TO local_shard_execution_dropped_column;
ALTER TABLE t1 DROP COLUMN a;
SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
\c - - - :worker_2_port
SET search_path TO local_shard_execution_dropped_column;
-- show the dropped columns
SELECT attrelid::regclass, attname, attnum, attisdropped
FROM pg_attribute WHERE attrelid IN ('t1'::regclass, 't1_2460000'::regclass) and attname NOT IN ('tableoid','cmax', 'xmax', 'cmin', 'xmin', 'ctid')
ORDER BY 1, 3, 2, 4;
prepare p1(int) as insert into t1(c) VALUES ($1) ON CONFLICT (c) DO NOTHING;
SET citus.log_remote_commands TO ON;
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
execute p1(8);
prepare p2(int) as SELECT count(*) FROM t1 WHERE c = $1 GROUP BY c;
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
execute p2(8);
prepare p3(int) as INSERT INTO t1(c) VALUES ($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1),($1) ON CONFLICT DO NOTHING;
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
execute p3(8);
\c - - - :master_port
DROP SCHEMA local_shard_execution_dropped_column CASCADE;