replan nonfastpath modify queries if shards are missing at BeginModifyScan

reroute-non-fastpath
aykutbozkurt 2022-09-23 11:05:47 +03:00
parent 8e5ba45b74
commit 6b0f64e0b2
12 changed files with 1285 additions and 203 deletions

View File

@ -341,6 +341,8 @@ static void FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDes
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
static LocalCopyStatus GetLocalCopyStatus(void); static LocalCopyStatus GetLocalCopyStatus(void);
static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList);
static void EnsureHaveShardsToCopy(List *shardIntervalList, bool isHashDistribiuted,
char *relationName);
static void LogLocalCopyToRelationExecution(uint64 shardId); static void LogLocalCopyToRelationExecution(uint64 shardId);
static void LogLocalCopyToFileExecution(uint64 shardId); static void LogLocalCopyToFileExecution(uint64 shardId);
@ -2023,6 +2025,36 @@ ShardIntervalListHasLocalPlacements(List *shardIntervalList)
} }
/*
* EnsureHaveShardsToCopy ensures that given shard interval list
* has shards to copy.
*/
static void
EnsureHaveShardsToCopy(List *shardIntervalList, bool isHashDistribiuted,
char *relationName)
{
if (shardIntervalList == NIL)
{
if (isHashDistribiuted)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName),
errhint("Run master_create_worker_shards to create shards "
"and try again.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName)));
}
}
}
/* /*
* CitusCopyDestReceiverStartup implements the rStartup interface of * CitusCopyDestReceiverStartup implements the rStartup interface of
* CitusCopyDestReceiver. It opens the relation, acquires necessary * CitusCopyDestReceiver. It opens the relation, acquires necessary
@ -2056,26 +2088,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->tupleDescriptor = inputTupleDescriptor; copyDest->tupleDescriptor = inputTupleDescriptor;
/* load the list of shards and verify that we have shards to copy into */ /* load the list of shards and verify that we have shards to copy into */
List *shardIntervalList = LoadShardIntervalList(tableId); List *shardIntervalList = LoadShardIntervalListWithRetry(tableId, ShareLock);
if (shardIntervalList == NIL) bool isHashDistributed = IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED);
{ EnsureHaveShardsToCopy(shardIntervalList, isHashDistributed, relationName);
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName),
errhint("Run master_create_worker_shards to create shards "
"and try again.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName)));
}
}
/* error if any shard missing min/max values */ /* error if any shard missing min/max values */
if (cacheEntry->hasUninitializedShardInterval) if (cacheEntry->hasUninitializedShardInterval)
@ -2091,9 +2106,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
} }
} }
/* prevent concurrent placement changes and non-commutative DML statements */
LockShardListMetadata(shardIntervalList, ShareLock);
/* /*
* Prevent concurrent UPDATE/DELETE on replication factor >1 * Prevent concurrent UPDATE/DELETE on replication factor >1
* (see AcquireExecutorMultiShardLocks() at multi_router_executor.c) * (see AcquireExecutorMultiShardLocks() at multi_router_executor.c)

View File

@ -39,8 +39,9 @@
#include "distributed/function_call_delegation.h" #include "distributed/function_call_delegation.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/optimizer.h"
#include "optimizer/planner.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/rel.h" #include "utils/rel.h"
@ -61,6 +62,7 @@ static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int efla
static void CitusPreExecScan(CitusScanState *scanState); static void CitusPreExecScan(CitusScanState *scanState);
static bool ModifyJobNeedsEvaluation(Job *workerJob); static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob); static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static DistributedPlan * RePlanNonFastPathQuery(DistributedPlan *distributedPlan);
static void RegenerateTaskListForInsert(Job *workerJob); static void RegenerateTaskListForInsert(Job *workerJob);
static DistributedPlan * CopyDistributedPlanWithoutCache( static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan); DistributedPlan *originalDistributedPlan);
@ -412,12 +414,38 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
/* /*
* In case of a split, the shard might no longer be available. In that * In case of a split, the shard might no longer be available. In that
* case try to reroute. We can only do this for fast path queries. * case try to reroute. We reroute missing shards for fast-path queries.
* And we have to replan for non-fastpath queries as pruning directly depends
* on postgres planner. (Might be optimized if we have enough info fed from
* planning phase. That way, we can recompute tasks similarly but it is more complex.)
*/ */
if (currentPlan->fastPathRouterPlan && if (!AnchorShardsInTaskListExist(workerJob->taskList))
!AnchorShardsInTaskListExist(workerJob->taskList))
{ {
TryToRerouteFastPathModifyQuery(workerJob); if (currentPlan->fastPathRouterPlan)
{
TryToRerouteFastPathModifyQuery(workerJob);
}
else
{
/*
* we should only replan if we have valid topLevelQueryContext which means our plan
* is top level plan (not a subplan)
*/
if (originalDistributedPlan->topLevelQueryContext)
{
DistributedPlan *newDistributedPlan = RePlanNonFastPathQuery(
originalDistributedPlan);
scanState->distributedPlan = newDistributedPlan;
/*
* switch to oldcontext and restart CitusBeginModifyScan (maybe to regenerate tasks
* due to deferredPruning)
*/
MemoryContextSwitchTo(oldContext);
CitusBeginModifyScan((CustomScanState *) scanState, estate, eflags);
return;
}
}
} }
/* ensure there is no invalid shard */ /* ensure there is no invalid shard */
@ -658,6 +686,35 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
} }
/*
* RePlanNonFastPathQuery replans the initial query, which is stored in the distributed
* plan, at the start of the planning.
*
* That method should only be used when we detect any missing shard at execution
* phase.
*/
static DistributedPlan *
RePlanNonFastPathQuery(DistributedPlan *oldPlan)
{
Assert(!oldPlan->fastPathRouterPlan);
/* extract top level query info from the TopLevelQueryContext stored in the old plan */
TopLevelQueryContext *topLevelQueryContext = oldPlan->topLevelQueryContext;
Query *originalQuery = topLevelQueryContext->query;
int cursorOptions = topLevelQueryContext->cursorOptions;
ParamListInfo boundParams = topLevelQueryContext->boundParams;
/* replan the top level query */
PlannedStmt *plannedStmt = planner(originalQuery, NULL, cursorOptions, boundParams);
/* return the new plan */
DistributedPlan *newDistributedPlan = GetDistributedPlan(
(CustomScan *) plannedStmt->planTree);
return newDistributedPlan;
}
/* /*
* AdaptiveExecutorCreateScan creates the scan state for the adaptive executor. * AdaptiveExecutorCreateScan creates the scan state for the adaptive executor.
*/ */

View File

@ -1141,6 +1141,30 @@ LoadShardIntervalList(Oid relationId)
} }
/*
* LoadShardIntervalListWithRetry reloads the shards of given relation after acquiring
* metadata locks for the shards with specified lockmode.
*
* That method is supposed to be called in case a concurrent shard modification might happen
* and we want to minimize the errors related to missing shards.
*/
List *
LoadShardIntervalListWithRetry(Oid relationId, LOCKMODE lockmode)
{
List *shardList = LoadShardIntervalList(relationId);
/*
* We block until all locks for shards are released and then reload shards to minimize 'missing shards'
* errors. (We cannot remove the error completely but reduce the possibility with reload)
*/
LockShardListMetadata(shardList, lockmode);
shardList = LoadShardIntervalList(relationId);
return shardList;
}
/* /*
* LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a * LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a
* given distributed table. The function returns an empty list if no shards can be found * given distributed table. The function returns an empty list if no shards can be found

View File

@ -74,6 +74,9 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */ /* keep track of planner call stack levels */
int PlannerLevel = 0; int PlannerLevel = 0;
/* store untransformed query and boundParams to be able to replan when necessary */
static TopLevelQueryContext *TopLevelQueryCtx = NULL;
static void ErrorIfQueryHasMergeCommand(Query *queryTree); static void ErrorIfQueryHasMergeCommand(Query *queryTree);
static bool ContainsMergeCommandWalker(Node *node); static bool ContainsMergeCommandWalker(Node *node);
static bool ListContainsDistributedTableRTE(List *rangeTableList, static bool ListContainsDistributedTableRTE(List *rangeTableList,
@ -130,7 +133,8 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList); static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex); static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList); static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static void PersistTopLevelQueryInfo(Query *query, ParamListInfo boundParams, int
cursorOptions);
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
@ -175,6 +179,20 @@ distributed_planner(Query *parse,
} }
} }
/*
* we should only store TopLevelQueryCtx if PlannerLevel = 0
* because we want to store top level query info which
* are required to replan if we find missing shard for nonfastpath
* queries just before execution.
*
* TopLevelQueryCtx will point to top level query info even if we
* recursively call planner.
*/
if (PlannerLevel == 0 && needsDistributedPlanning)
{
PersistTopLevelQueryInfo(parse, boundParams, cursorOptions);
}
int rteIdCounter = 1; int rteIdCounter = 1;
DistributedPlanningContext planContext = { DistributedPlanningContext planContext = {
@ -282,6 +300,24 @@ distributed_planner(Query *parse,
/* remove the context from the context list */ /* remove the context from the context list */
PopPlannerRestrictionContext(); PopPlannerRestrictionContext();
/*
* when PlannerLevel = 0, we are sure that this is the top level plan (the end of recursion for planner)
* Then we can, store top level query info inside top level Citus plan, if any
*/
if (PlannerLevel == 0 && needsDistributedPlanning)
{
Assert(TopLevelQueryCtx);
if (result && IsCitusCustomScan(result->planTree))
{
DistributedPlan *distPlan = GetDistributedPlan(
(CustomScan *) result->planTree);
distPlan->topLevelQueryContext = TopLevelQueryCtx;
}
TopLevelQueryCtx = NULL;
}
/* /*
* In some cases, for example; parameterized SQL functions, we may miss that * In some cases, for example; parameterized SQL functions, we may miss that
* there is a need for distributed planning. Such cases only become clear after * there is a need for distributed planning. Such cases only become clear after
@ -302,6 +338,28 @@ distributed_planner(Query *parse,
} }
/*
* PersistTopLevelQueryInfo stores given top level query information in global TopLevelQueryCtx
* to replan in case we detect missing shards just before the execution.
*/
static void
PersistTopLevelQueryInfo(Query *query, ParamListInfo boundParams, int cursorOptions)
{
/* assure only called when we are at top level planner */
Assert(PlannerLevel == 0);
/* we need to store unmodified top-level query information during transaction */
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
TopLevelQueryCtx = palloc0(sizeof(TopLevelQueryContext));
TopLevelQueryCtx->query = copyObject(query);
TopLevelQueryCtx->cursorOptions = cursorOptions;
TopLevelQueryCtx->boundParams = copyParamList(boundParams);
MemoryContextSwitchTo(oldContext);
}
/* /*
* ErrorIfQueryHasMergeCommand walks over the query tree and throws error * ErrorIfQueryHasMergeCommand walks over the query tree and throws error
* if there are any Merge command (e.g., CMD_MERGE) in the query tree. * if there are any Merge command (e.g., CMD_MERGE) in the query tree.

View File

@ -278,7 +278,10 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId); CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength;
/* grab shared metadata lock to stop concurrent placement additions */
List *shardIntervalList = LoadShardIntervalListWithRetry(targetRelationId, ShareLock);
RelationRestrictionContext *relationRestrictionContext = RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext; plannerRestrictionContext->relationRestrictionContext;
bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool allReferenceTables = relationRestrictionContext->allReferenceTables;
@ -310,11 +313,9 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
* the current shard boundaries. Finally, perform the normal shard pruning to * the current shard boundaries. Finally, perform the normal shard pruning to
* decide on whether to push the query to the current shard or not. * decide on whether to push the query to the current shard or not.
*/ */
for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) ShardInterval *targetShardInterval = NULL;
foreach_ptr(targetShardInterval, shardIntervalList)
{ {
ShardInterval *targetShardInterval =
targetCacheEntry->sortedShardIntervalArray[shardOffset];
Task *modifyTask = RouterModifyTaskForShardInterval(originalQuery, Task *modifyTask = RouterModifyTaskForShardInterval(originalQuery,
targetCacheEntry, targetCacheEntry,
targetShardInterval, targetShardInterval,
@ -798,9 +799,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
copyOfPlannerRestrictionContext->relationRestrictionContext-> copyOfPlannerRestrictionContext->relationRestrictionContext->
relationRestrictionList; relationRestrictionList;
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
/* /*
* Replace the partitioning qual parameter value in all baserestrictinfos. * Replace the partitioning qual parameter value in all baserestrictinfos.
* Note that this has to be done on a copy, as the walker modifies in place. * Note that this has to be done on a copy, as the walker modifies in place.

View File

@ -178,6 +178,17 @@ typedef struct DistributedPlanningContext
} DistributedPlanningContext; } DistributedPlanningContext;
/*
* TopLevelQueryContext used to replan queries for missing shard just before execution
*/
typedef struct TopLevelQueryContext
{
Query *query;
int cursorOptions;
ParamListInfo boundParams;
} TopLevelQueryContext;
/* /*
* CitusCustomScanPath is injected into the planner during the combine query planning * CitusCustomScanPath is injected into the planner during the combine query planning
* phase of the logical planner. * phase of the logical planner.

View File

@ -280,6 +280,7 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS);
/* Function declarations to read shard and shard placement data */ /* Function declarations to read shard and shard placement data */
extern uint32 TableShardReplicationFactor(Oid relationId); extern uint32 TableShardReplicationFactor(Oid relationId);
extern List * LoadShardIntervalList(Oid relationId); extern List * LoadShardIntervalList(Oid relationId);
extern List * LoadShardIntervalListWithRetry(Oid relationId, LOCKMODE lockmode);
extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId); extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId);
extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId); extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId);
extern int ShardIntervalCount(Oid relationId); extern int ShardIntervalCount(Oid relationId);

View File

@ -457,6 +457,13 @@ typedef struct DistributedPlan
* or if prepared statement parameters prevented successful planning. * or if prepared statement parameters prevented successful planning.
*/ */
DeferredErrorMessage *planningError; DeferredErrorMessage *planningError;
/*
* will be used to replan the query in case of missing shards in any task
* just before execution. We try to replan to succeed the query instead of showing
* the user unfriendly 'shard missing' errors.
*/
TopLevelQueryContext *topLevelQueryContext;
} DistributedPlan; } DistributedPlan;

View File

@ -291,7 +291,6 @@ step s2-commit:
COMMIT; COMMIT;
step s1-copy: <... completed> step s1-copy: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit: step s1-commit:
COMMIT; COMMIT;
@ -308,14 +307,19 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500001|t | 0 57637|1500001|t | 4
57637|1500003|t | 0 57637|1500003|t | 0
57638|1500004|t | 0 57638|1500004|t | 1
(3 rows) (3 rows)
id|value id|value
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) 1| 1
2| 2
3| 3
4| 4
5| 5
(5 rows)
starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster
@ -593,7 +597,6 @@ step s2-commit:
COMMIT; COMMIT;
step s1-copy: <... completed> step s1-copy: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit: step s1-commit:
COMMIT; COMMIT;
@ -610,14 +613,19 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500001|t | 0 57637|1500001|t | 4
57637|1500003|t | 0 57637|1500003|t | 0
57638|1500004|t | 0 57638|1500004|t | 1
(3 rows) (3 rows)
id|value id|value
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) 1| 1
2| 2
3| 3
4| 4
5| 5
(5 rows)
starting permutation: s1-load-cache s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster starting permutation: s1-load-cache s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster

File diff suppressed because it is too large Load Diff

View File

@ -59,7 +59,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500090 1500162
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -78,10 +78,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500089|t | 0 57637|1500161|t | 0
57637|1500090|t | 1 57637|1500162|t | 1
57637|1500091|t | 0 57637|1500163|t | 0
57638|1500086|t | 0 57638|1500158|t | 0
(4 rows) (4 rows)
id|value id|value
@ -149,7 +149,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500098 1500170
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -168,10 +168,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500097|t | 0 57637|1500169|t | 0
57637|1500098|t | 0 57637|1500170|t | 0
57637|1500099|t | 0 57637|1500171|t | 0
57638|1500094|t | 0 57638|1500166|t | 0
(4 rows) (4 rows)
id|value id|value
@ -240,7 +240,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500106 1500178
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -259,10 +259,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500105|t | 0 57637|1500177|t | 0
57637|1500106|t | 1 57637|1500178|t | 1
57637|1500107|t | 0 57637|1500179|t | 0
57638|1500102|t | 0 57638|1500174|t | 0
(4 rows) (4 rows)
id|value id|value
@ -327,7 +327,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500114 1500186
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -346,10 +346,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500113|t | 0 57637|1500185|t | 0
57637|1500114|t | 1 57637|1500186|t | 1
57637|1500115|t | 0 57637|1500187|t | 0
57638|1500110|t | 0 57638|1500182|t | 0
(4 rows) (4 rows)
id|value id|value
@ -413,7 +413,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500122 1500194
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -432,10 +432,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500121|t | 1 57637|1500193|t | 1
57637|1500122|t | 1 57637|1500194|t | 1
57637|1500123|t | 2 57637|1500195|t | 2
57638|1500118|t | 1 57638|1500190|t | 1
(4 rows) (4 rows)
id|value id|value
@ -503,7 +503,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500130 1500202
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -522,10 +522,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500129|t | 0 57637|1500201|t | 0
57637|1500130|t | 1 57637|1500202|t | 1
57637|1500131|t | 0 57637|1500203|t | 0
57638|1500126|t | 0 57638|1500198|t | 0
(4 rows) (4 rows)
id|value id|value
@ -589,7 +589,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500138 1500210
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -608,10 +608,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500137|t | 0 57637|1500209|t | 0
57637|1500138|t | 0 57637|1500210|t | 0
57637|1500139|t | 0 57637|1500211|t | 0
57638|1500134|t | 0 57638|1500206|t | 0
(4 rows) (4 rows)
id|value id|value
@ -676,7 +676,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500146 1500218
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -695,10 +695,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500145|t | 0 57637|1500217|t | 0
57637|1500146|t | 1 57637|1500218|t | 1
57637|1500147|t | 0 57637|1500219|t | 0
57638|1500142|t | 0 57638|1500214|t | 0
(4 rows) (4 rows)
id|value id|value
@ -759,7 +759,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500154 1500226
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -778,10 +778,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500153|t | 0 57637|1500225|t | 0
57637|1500154|t | 1 57637|1500226|t | 1
57637|1500155|t | 0 57637|1500227|t | 0
57638|1500150|t | 0 57638|1500222|t | 0
(4 rows) (4 rows)
id|value id|value
@ -841,7 +841,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500162 1500234
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -860,10 +860,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500161|t | 1 57637|1500233|t | 1
57637|1500162|t | 1 57637|1500234|t | 1
57637|1500163|t | 2 57637|1500235|t | 2
57638|1500158|t | 1 57638|1500230|t | 1
(4 rows) (4 rows)
id|value id|value
@ -916,7 +916,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500170 1500242
(1 row) (1 row)
step s2-print-cluster: step s2-print-cluster:
@ -932,10 +932,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500169|t | 0 57637|1500241|t | 0
57637|1500170|t | 1 57637|1500242|t | 1
57637|1500171|t | 0 57637|1500243|t | 0
57638|1500166|t | 0 57638|1500238|t | 0
(4 rows) (4 rows)
id|value id|value
@ -984,7 +984,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500178 1500250
(1 row) (1 row)
step s2-print-cluster: step s2-print-cluster:
@ -1000,10 +1000,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500177|t | 0 57637|1500249|t | 0
57637|1500178|t | 1 57637|1500250|t | 1
57637|1500179|t | 0 57637|1500251|t | 0
57638|1500174|t | 0 57638|1500246|t | 0
(4 rows) (4 rows)
id|value id|value
@ -1052,7 +1052,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500186 1500258
(1 row) (1 row)
step s2-print-cluster: step s2-print-cluster:
@ -1068,10 +1068,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500185|t | 0 57637|1500257|t | 0
57637|1500186|t | 1 57637|1500258|t | 1
57637|1500187|t | 0 57637|1500259|t | 0
57638|1500182|t | 0 57638|1500254|t | 0
(4 rows) (4 rows)
id|value id|value
@ -1123,7 +1123,7 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500197 1500269
(1 row) (1 row)
step s2-commit: step s2-commit:
@ -1142,10 +1142,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500196|t | 0 57637|1500268|t | 0
57637|1500197|t | 1 57637|1500269|t | 1
57637|1500198|t | 0 57637|1500270|t | 0
57638|1500193|t | 0 57638|1500265|t | 0
(4 rows) (4 rows)
id|value id|value
@ -1193,13 +1193,13 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500208 1500280
(1 row) (1 row)
step s1-isolate-tenant-no-same-coloc-blocking: <... completed> step s1-isolate-tenant-no-same-coloc-blocking: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500211 1500283
(1 row) (1 row)
step s2-print-cluster: step s2-print-cluster:
@ -1215,10 +1215,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500207|t | 0 57637|1500279|t | 0
57637|1500208|t | 1 57637|1500280|t | 1
57637|1500209|t | 0 57637|1500281|t | 0
57638|1500204|t | 0 57638|1500276|t | 0
(4 rows) (4 rows)
id|value id|value
@ -1266,13 +1266,13 @@ t
step s2-isolate-tenant: <... completed> step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500219 1500291
(1 row) (1 row)
step s1-isolate-tenant-no-same-coloc-blocking: <... completed> step s1-isolate-tenant-no-same-coloc-blocking: <... completed>
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
--------------------------------------------------------------------- ---------------------------------------------------------------------
1500222 1500294
(1 row) (1 row)
step s2-print-cluster: step s2-print-cluster:
@ -1288,10 +1288,10 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500218|t | 0 57637|1500290|t | 0
57637|1500219|t | 1 57637|1500291|t | 1
57637|1500220|t | 0 57637|1500292|t | 0
57638|1500215|t | 0 57638|1500287|t | 0
(4 rows) (4 rows)
id|value id|value

View File

@ -39,18 +39,50 @@ step "s1-insert"
INSERT INTO isolation_table VALUES (5, 10); INSERT INTO isolation_table VALUES (5, 10);
} }
step "s1-pushable-insert-select"
{
INSERT INTO isolation_table SELECT * FROM isolation_table;
}
step "s1-nonpushable-insert-select-with-copy"
{
INSERT INTO isolation_table SELECT 5;
}
step "s1-nonpushable-insert-select-returning"
{
INSERT INTO isolation_table SELECT 5 RETURNING *;
}
step "s1-nonpushable-insert-select-conflict"
{
INSERT INTO isolation_table SELECT 5 ON CONFLICT DO NOTHING;
}
step "s1-update" step "s1-update"
{ {
UPDATE isolation_table SET value = 5 WHERE id = 5; UPDATE isolation_table SET value = 5 WHERE id = 5;
} }
step "s1-update-complex" step "s1-update-recursive"
{ {
UPDATE isolation_table SET value = 5 WHERE id IN ( UPDATE isolation_table SET value = 5 WHERE id IN (
SELECT max(id) FROM isolation_table SELECT max(id) FROM isolation_table
); );
} }
step "s1-update-cte"
{
-- here update cte is subplan of the select plan. We do not replan a subplan if a shard is missing at CitusBeginModifyScan
WITH cte_1 AS (UPDATE isolation_table SET value = value RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2;
}
step "s1-update-select-cte"
{
-- here update plan is a top-level plan of the select cte. We can replan a top-level plan if a shard is missing at CitusBeginModifyScan
WITH cte_1 AS ( SELECT * FROM isolation_table ORDER BY 1,2 ) UPDATE isolation_table SET value = value WHERE EXISTS(SELECT * FROM cte_1);
}
step "s1-delete" step "s1-delete"
{ {
DELETE FROM isolation_table WHERE id = 5; DELETE FROM isolation_table WHERE id = 5;
@ -123,23 +155,45 @@ step "s2-print-index-count"
} }
// run tenant isolation while concurrently performing an DML and index creation // run tenant isolation while concurrently performing an DML and index creation
// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed // we expect DML/DDL queries to be rerouted to valid shards even if the shard they are waiting for is destroyed
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-complex" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-pushable-insert-select" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-with-copy" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-returning" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-conflict" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
// the same tests without loading the cache at first // the same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-complex" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-pushable-insert-select" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-with-copy" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-returning" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-conflict" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-recursive" "s2-commit" "s1-commit" "s2-print-cluster"
// the same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-recursive" "s2-commit" "s1-commit" "s2-print-cluster"
// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-cte" "s2-commit" "s1-commit" "s2-print-cluster"
// the same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-cte" "s2-commit" "s1-commit" "s2-print-cluster"
// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-select-cte" "s2-commit" "s1-commit" "s2-print-cluster"
// the same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-select-cte" "s2-commit" "s1-commit" "s2-print-cluster"
// concurrent tenant isolation blocks on different shards of the same table (or any colocated table) // concurrent tenant isolation blocks on different shards of the same table (or any colocated table)
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-isolate-tenant" "s2-isolate-tenant" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-isolate-tenant" "s2-isolate-tenant" "s1-commit" "s2-print-cluster"