diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b5ac6a519..ca9faa9f5 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -341,6 +341,8 @@ static void FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDes static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); static LocalCopyStatus GetLocalCopyStatus(void); static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); +static void EnsureHaveShardsToCopy(List *shardIntervalList, bool isHashDistribiuted, + char *relationName); static void LogLocalCopyToRelationExecution(uint64 shardId); static void LogLocalCopyToFileExecution(uint64 shardId); @@ -2023,6 +2025,36 @@ ShardIntervalListHasLocalPlacements(List *shardIntervalList) } +/* + * EnsureHaveShardsToCopy ensures that given shard interval list + * has shards to copy. + */ +static void +EnsureHaveShardsToCopy(List *shardIntervalList, bool isHashDistribiuted, + char *relationName) +{ + if (shardIntervalList == NIL) + { + if (isHashDistribiuted) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards into which to copy"), + errdetail("No shards exist for distributed table \"%s\".", + relationName), + errhint("Run master_create_worker_shards to create shards " + "and try again."))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards into which to copy"), + errdetail("No shards exist for distributed table \"%s\".", + relationName))); + } + } +} + + /* * CitusCopyDestReceiverStartup implements the rStartup interface of * CitusCopyDestReceiver. It opens the relation, acquires necessary @@ -2056,26 +2088,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->tupleDescriptor = inputTupleDescriptor; /* load the list of shards and verify that we have shards to copy into */ - List *shardIntervalList = LoadShardIntervalList(tableId); - if (shardIntervalList == NIL) - { - if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName), - errhint("Run master_create_worker_shards to create shards " - "and try again."))); - } - else - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName))); - } - } + List *shardIntervalList = LoadShardIntervalListWithRetry(tableId, ShareLock); + bool isHashDistributed = IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED); + EnsureHaveShardsToCopy(shardIntervalList, isHashDistributed, relationName); /* error if any shard missing min/max values */ if (cacheEntry->hasUninitializedShardInterval) @@ -2091,9 +2106,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } } - /* prevent concurrent placement changes and non-commutative DML statements */ - LockShardListMetadata(shardIntervalList, ShareLock); - /* * Prevent concurrent UPDATE/DELETE on replication factor >1 * (see AcquireExecutorMultiShardLocks() at multi_router_executor.c) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 5e4afd1a7..dba818274 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -39,8 +39,9 @@ #include "distributed/function_call_delegation.h" #include "executor/executor.h" #include "nodes/makefuncs.h" -#include "optimizer/optimizer.h" #include "optimizer/clauses.h" +#include "optimizer/optimizer.h" +#include "optimizer/planner.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -61,6 +62,7 @@ static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int efla static void CitusPreExecScan(CitusScanState *scanState); static bool ModifyJobNeedsEvaluation(Job *workerJob); static void RegenerateTaskForFasthPathQuery(Job *workerJob); +static DistributedPlan * RePlanNonFastPathQuery(DistributedPlan *distributedPlan); static void RegenerateTaskListForInsert(Job *workerJob); static DistributedPlan * CopyDistributedPlanWithoutCache( DistributedPlan *originalDistributedPlan); @@ -412,12 +414,38 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) /* * In case of a split, the shard might no longer be available. In that - * case try to reroute. We can only do this for fast path queries. + * case try to reroute. We reroute missing shards for fast-path queries. + * And we have to replan for non-fastpath queries as pruning directly depends + * on postgres planner. (Might be optimized if we have enough info fed from + * planning phase. That way, we can recompute tasks similarly but it is more complex.) */ - if (currentPlan->fastPathRouterPlan && - !AnchorShardsInTaskListExist(workerJob->taskList)) + if (!AnchorShardsInTaskListExist(workerJob->taskList)) { - TryToRerouteFastPathModifyQuery(workerJob); + if (currentPlan->fastPathRouterPlan) + { + TryToRerouteFastPathModifyQuery(workerJob); + } + else + { + /* + * we should only replan if we have valid topLevelQueryContext which means our plan + * is top level plan (not a subplan) + */ + if (originalDistributedPlan->topLevelQueryContext) + { + DistributedPlan *newDistributedPlan = RePlanNonFastPathQuery( + originalDistributedPlan); + scanState->distributedPlan = newDistributedPlan; + + /* + * switch to oldcontext and restart CitusBeginModifyScan (maybe to regenerate tasks + * due to deferredPruning) + */ + MemoryContextSwitchTo(oldContext); + CitusBeginModifyScan((CustomScanState *) scanState, estate, eflags); + return; + } + } } /* ensure there is no invalid shard */ @@ -658,6 +686,35 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) } +/* + * RePlanNonFastPathQuery replans the initial query, which is stored in the distributed + * plan, at the start of the planning. + * + * That method should only be used when we detect any missing shard at execution + * phase. + */ +static DistributedPlan * +RePlanNonFastPathQuery(DistributedPlan *oldPlan) +{ + Assert(!oldPlan->fastPathRouterPlan); + + /* extract top level query info from the TopLevelQueryContext stored in the old plan */ + TopLevelQueryContext *topLevelQueryContext = oldPlan->topLevelQueryContext; + + Query *originalQuery = topLevelQueryContext->query; + int cursorOptions = topLevelQueryContext->cursorOptions; + ParamListInfo boundParams = topLevelQueryContext->boundParams; + + /* replan the top level query */ + PlannedStmt *plannedStmt = planner(originalQuery, NULL, cursorOptions, boundParams); + + /* return the new plan */ + DistributedPlan *newDistributedPlan = GetDistributedPlan( + (CustomScan *) plannedStmt->planTree); + return newDistributedPlan; +} + + /* * AdaptiveExecutorCreateScan creates the scan state for the adaptive executor. */ diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index dce66965a..882c3f283 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1141,6 +1141,30 @@ LoadShardIntervalList(Oid relationId) } +/* + * LoadShardIntervalListWithRetry reloads the shards of given relation after acquiring + * metadata locks for the shards with specified lockmode. + * + * That method is supposed to be called in case a concurrent shard modification might happen + * and we want to minimize the errors related to missing shards. + */ +List * +LoadShardIntervalListWithRetry(Oid relationId, LOCKMODE lockmode) +{ + List *shardList = LoadShardIntervalList(relationId); + + /* + * We block until all locks for shards are released and then reload shards to minimize 'missing shards' + * errors. (We cannot remove the error completely but reduce the possibility with reload) + */ + LockShardListMetadata(shardList, lockmode); + + shardList = LoadShardIntervalList(relationId); + + return shardList; +} + + /* * LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a * given distributed table. The function returns an empty list if no shards can be found diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index c6713399b..9d4b8ddfc 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -74,6 +74,9 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; +/* store untransformed query and boundParams to be able to replan when necessary */ +static TopLevelQueryContext *TopLevelQueryCtx = NULL; + static void ErrorIfQueryHasMergeCommand(Query *queryTree); static bool ContainsMergeCommandWalker(Node *node); static bool ListContainsDistributedTableRTE(List *rangeTableList, @@ -130,7 +133,8 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); static void WarnIfListHasForeignDistributedTable(List *rangeTableList); - +static void PersistTopLevelQueryInfo(Query *query, ParamListInfo boundParams, int + cursorOptions); /* Distributed planner hook */ PlannedStmt * @@ -175,6 +179,20 @@ distributed_planner(Query *parse, } } + /* + * we should only store TopLevelQueryCtx if PlannerLevel = 0 + * because we want to store top level query info which + * are required to replan if we find missing shard for nonfastpath + * queries just before execution. + * + * TopLevelQueryCtx will point to top level query info even if we + * recursively call planner. + */ + if (PlannerLevel == 0 && needsDistributedPlanning) + { + PersistTopLevelQueryInfo(parse, boundParams, cursorOptions); + } + int rteIdCounter = 1; DistributedPlanningContext planContext = { @@ -282,6 +300,24 @@ distributed_planner(Query *parse, /* remove the context from the context list */ PopPlannerRestrictionContext(); + /* + * when PlannerLevel = 0, we are sure that this is the top level plan (the end of recursion for planner) + * Then we can, store top level query info inside top level Citus plan, if any + */ + if (PlannerLevel == 0 && needsDistributedPlanning) + { + Assert(TopLevelQueryCtx); + + if (result && IsCitusCustomScan(result->planTree)) + { + DistributedPlan *distPlan = GetDistributedPlan( + (CustomScan *) result->planTree); + distPlan->topLevelQueryContext = TopLevelQueryCtx; + } + + TopLevelQueryCtx = NULL; + } + /* * In some cases, for example; parameterized SQL functions, we may miss that * there is a need for distributed planning. Such cases only become clear after @@ -302,6 +338,28 @@ distributed_planner(Query *parse, } +/* + * PersistTopLevelQueryInfo stores given top level query information in global TopLevelQueryCtx + * to replan in case we detect missing shards just before the execution. + */ +static void +PersistTopLevelQueryInfo(Query *query, ParamListInfo boundParams, int cursorOptions) +{ + /* assure only called when we are at top level planner */ + Assert(PlannerLevel == 0); + + /* we need to store unmodified top-level query information during transaction */ + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + TopLevelQueryCtx = palloc0(sizeof(TopLevelQueryContext)); + TopLevelQueryCtx->query = copyObject(query); + TopLevelQueryCtx->cursorOptions = cursorOptions; + TopLevelQueryCtx->boundParams = copyParamList(boundParams); + + MemoryContextSwitchTo(oldContext); +} + + /* * ErrorIfQueryHasMergeCommand walks over the query tree and throws error * if there are any Merge command (e.g., CMD_MERGE) in the query tree. diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 157e04dc4..cbf366fa5 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -278,7 +278,10 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); Oid targetRelationId = insertRte->relid; CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId); - int shardCount = targetCacheEntry->shardIntervalArrayLength; + + /* grab shared metadata lock to stop concurrent placement additions */ + List *shardIntervalList = LoadShardIntervalListWithRetry(targetRelationId, ShareLock); + RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; bool allReferenceTables = relationRestrictionContext->allReferenceTables; @@ -310,11 +313,9 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, * the current shard boundaries. Finally, perform the normal shard pruning to * decide on whether to push the query to the current shard or not. */ - for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) + ShardInterval *targetShardInterval = NULL; + foreach_ptr(targetShardInterval, shardIntervalList) { - ShardInterval *targetShardInterval = - targetCacheEntry->sortedShardIntervalArray[shardOffset]; - Task *modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetCacheEntry, targetShardInterval, @@ -798,9 +799,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, copyOfPlannerRestrictionContext->relationRestrictionContext-> relationRestrictionList; - /* grab shared metadata lock to stop concurrent placement additions */ - LockShardDistributionMetadata(shardId, ShareLock); - /* * Replace the partitioning qual parameter value in all baserestrictinfos. * Note that this has to be done on a copy, as the walker modifies in place. diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 31de463f0..e1787db19 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -178,6 +178,17 @@ typedef struct DistributedPlanningContext } DistributedPlanningContext; +/* + * TopLevelQueryContext used to replan queries for missing shard just before execution + */ +typedef struct TopLevelQueryContext +{ + Query *query; + int cursorOptions; + ParamListInfo boundParams; +} TopLevelQueryContext; + + /* * CitusCustomScanPath is injected into the planner during the combine query planning * phase of the logical planner. diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 81d95f868..18b24b467 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -280,6 +280,7 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS); /* Function declarations to read shard and shard placement data */ extern uint32 TableShardReplicationFactor(Oid relationId); extern List * LoadShardIntervalList(Oid relationId); +extern List * LoadShardIntervalListWithRetry(Oid relationId, LOCKMODE lockmode); extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId); extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId); extern int ShardIntervalCount(Oid relationId); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index a20085958..84bfd8e39 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -457,6 +457,13 @@ typedef struct DistributedPlan * or if prepared statement parameters prevented successful planning. */ DeferredErrorMessage *planningError; + + /* + * will be used to replan the query in case of missing shards in any task + * just before execution. We try to replan to succeed the query instead of showing + * the user unfriendly 'shard missing' errors. + */ + TopLevelQueryContext *topLevelQueryContext; } DistributedPlan; diff --git a/src/test/regress/expected/isolation_blocking_shard_split.out b/src/test/regress/expected/isolation_blocking_shard_split.out index 86d50e3e3..edeb17e0c 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_blocking_shard_split.out @@ -291,7 +291,6 @@ step s2-commit: COMMIT; step s1-copy: <... completed> -ERROR: could not find valid entry for shard xxxxx step s1-commit: COMMIT; @@ -308,14 +307,19 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500001|t | 0 + 57637|1500001|t | 4 57637|1500003|t | 0 - 57638|1500004|t | 0 + 57638|1500004|t | 1 (3 rows) id|value --------------------------------------------------------------------- -(0 rows) + 1| 1 + 2| 2 + 3| 3 + 4| 4 + 5| 5 +(5 rows) starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster @@ -593,7 +597,6 @@ step s2-commit: COMMIT; step s1-copy: <... completed> -ERROR: could not find valid entry for shard xxxxx step s1-commit: COMMIT; @@ -610,14 +613,19 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500001|t | 0 + 57637|1500001|t | 4 57637|1500003|t | 0 - 57638|1500004|t | 0 + 57638|1500004|t | 1 (3 rows) id|value --------------------------------------------------------------------- -(0 rows) + 1| 1 + 2| 2 + 3| 3 + 4| 4 + 5| 5 +(5 rows) starting permutation: s1-load-cache s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster diff --git a/src/test/regress/expected/isolation_tenant_isolation.out b/src/test/regress/expected/isolation_tenant_isolation.out index e2b2e4e43..09b1b9f05 100644 --- a/src/test/regress/expected/isolation_tenant_isolation.out +++ b/src/test/regress/expected/isolation_tenant_isolation.out @@ -143,7 +143,7 @@ id|value (0 rows) -starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-complex s2-commit s1-commit s2-print-cluster +starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-pushable-insert-select s2-commit s1-commit s2-print-cluster create_distributed_table --------------------------------------------------------------------- @@ -180,16 +180,13 @@ isolate_tenant_to_new_shard 1500016 (1 row) -step s1-update-complex: - UPDATE isolation_table SET value = 5 WHERE id IN ( - SELECT max(id) FROM isolation_table - ); +step s1-pushable-insert-select: + INSERT INTO isolation_table SELECT * FROM isolation_table; step s2-commit: COMMIT; -step s1-update-complex: <... completed> -ERROR: shard for the given value does not exist +step s1-pushable-insert-select: <... completed> step s1-commit: COMMIT; @@ -207,7 +204,7 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- 57637|1500015|t | 0 - 57637|1500016|t | 1 + 57637|1500016|t | 2 57637|1500017|t | 0 57638|1500014|t | 0 (4 rows) @@ -215,7 +212,8 @@ nodeport|shardid|success|result id|value --------------------------------------------------------------------- 5| 10 -(1 row) + 5| 10 +(2 rows) starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-insert s2-commit s1-commit s2-print-cluster @@ -287,7 +285,7 @@ id|value (1 row) -starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-copy s2-commit s1-commit s2-print-cluster +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-nonpushable-insert-select-with-copy s2-commit s1-commit s2-print-cluster create_distributed_table --------------------------------------------------------------------- @@ -321,14 +319,13 @@ isolate_tenant_to_new_shard 1500028 (1 row) -step s1-copy: - COPY isolation_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; +step s1-nonpushable-insert-select-with-copy: + INSERT INTO isolation_table SELECT 5; step s2-commit: COMMIT; -step s1-copy: <... completed> -ERROR: could not find valid entry for shard xxxxx +step s1-nonpushable-insert-select-with-copy: <... completed> step s1-commit: COMMIT; @@ -346,17 +343,18 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- 57637|1500027|t | 0 - 57637|1500028|t | 0 + 57637|1500028|t | 1 57637|1500029|t | 0 57638|1500026|t | 0 (4 rows) id|value --------------------------------------------------------------------- -(0 rows) + 5| +(1 row) -starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-nonpushable-insert-select-returning s2-commit s1-commit s2-print-cluster create_distributed_table --------------------------------------------------------------------- @@ -390,6 +388,222 @@ isolate_tenant_to_new_shard 1500034 (1 row) +step s1-nonpushable-insert-select-returning: + INSERT INTO isolation_table SELECT 5 RETURNING *; + +step s2-commit: + COMMIT; + +step s1-nonpushable-insert-select-returning: <... completed> +id|value +--------------------------------------------------------------------- + 5| +(1 row) + +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500033|t | 0 + 57637|1500034|t | 1 + 57637|1500035|t | 0 + 57638|1500032|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| +(1 row) + + +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-nonpushable-insert-select-conflict s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + TRUNCATE isolation_table; + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500040 +(1 row) + +step s1-nonpushable-insert-select-conflict: + INSERT INTO isolation_table SELECT 5 ON CONFLICT DO NOTHING; + +step s2-commit: + COMMIT; + +step s1-nonpushable-insert-select-conflict: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500039|t | 0 + 57637|1500040|t | 1 + 57637|1500041|t | 0 + 57638|1500038|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| +(1 row) + + +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-copy s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + TRUNCATE isolation_table; + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500046 +(1 row) + +step s1-copy: + COPY isolation_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + +step s2-commit: + COMMIT; + +step s1-copy: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500045|t | 1 + 57637|1500046|t | 1 + 57637|1500047|t | 2 + 57638|1500044|t | 1 +(4 rows) + +id|value +--------------------------------------------------------------------- + 1| 1 + 2| 2 + 3| 3 + 4| 4 + 5| 5 +(5 rows) + + +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-isolate-tenant s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + TRUNCATE isolation_table; + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500052 +(1 row) + step s1-ddl: CREATE INDEX test_table_index ON isolation_table(id); @@ -413,10 +627,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500033|t | 0 - 57637|1500034|t | 0 - 57637|1500035|t | 0 - 57638|1500032|t | 0 + 57637|1500051|t | 0 + 57637|1500052|t | 0 + 57637|1500053|t | 0 + 57638|1500050|t | 0 (4 rows) id|value @@ -471,7 +685,7 @@ step s2-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500040 + 1500058 (1 row) step s1-update: @@ -497,10 +711,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500039|t | 0 - 57637|1500040|t | 1 - 57637|1500041|t | 0 - 57638|1500038|t | 0 + 57637|1500057|t | 0 + 57637|1500058|t | 1 + 57637|1500059|t | 0 + 57638|1500056|t | 0 (4 rows) id|value @@ -540,7 +754,7 @@ step s2-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500046 + 1500064 (1 row) step s1-delete: @@ -566,10 +780,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500045|t | 0 - 57637|1500046|t | 0 - 57637|1500047|t | 0 - 57638|1500044|t | 0 + 57637|1500063|t | 0 + 57637|1500064|t | 0 + 57637|1500065|t | 0 + 57638|1500062|t | 0 (4 rows) id|value @@ -577,7 +791,7 @@ id|value (0 rows) -starting permutation: s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-complex s2-commit s1-commit s2-print-cluster +starting permutation: s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-pushable-insert-select s2-commit s1-commit s2-print-cluster create_distributed_table --------------------------------------------------------------------- @@ -608,19 +822,16 @@ step s2-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500052 + 1500070 (1 row) -step s1-update-complex: - UPDATE isolation_table SET value = 5 WHERE id IN ( - SELECT max(id) FROM isolation_table - ); +step s1-pushable-insert-select: + INSERT INTO isolation_table SELECT * FROM isolation_table; step s2-commit: COMMIT; -step s1-update-complex: <... completed> -ERROR: shard for the given value does not exist +step s1-pushable-insert-select: <... completed> step s1-commit: COMMIT; @@ -637,16 +848,17 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500051|t | 0 - 57637|1500052|t | 1 - 57637|1500053|t | 0 - 57638|1500050|t | 0 + 57637|1500069|t | 0 + 57637|1500070|t | 2 + 57637|1500071|t | 0 + 57638|1500068|t | 0 (4 rows) id|value --------------------------------------------------------------------- 5| 10 -(1 row) + 5| 10 +(2 rows) starting permutation: s1-begin s1-select s2-begin s2-isolate-tenant s1-insert s2-commit s1-commit s2-print-cluster @@ -677,7 +889,7 @@ step s2-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500058 + 1500076 (1 row) step s1-insert: @@ -703,10 +915,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500057|t | 0 - 57637|1500058|t | 1 - 57637|1500059|t | 0 - 57638|1500056|t | 0 + 57637|1500075|t | 0 + 57637|1500076|t | 1 + 57637|1500077|t | 0 + 57638|1500074|t | 0 (4 rows) id|value @@ -715,6 +927,209 @@ id|value (1 row) +starting permutation: s1-begin s1-select s2-begin s2-isolate-tenant s1-nonpushable-insert-select-with-copy s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500082 +(1 row) + +step s1-nonpushable-insert-select-with-copy: + INSERT INTO isolation_table SELECT 5; + +step s2-commit: + COMMIT; + +step s1-nonpushable-insert-select-with-copy: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500081|t | 0 + 57637|1500082|t | 1 + 57637|1500083|t | 0 + 57638|1500080|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| +(1 row) + + +starting permutation: s1-begin s1-select s2-begin s2-isolate-tenant s1-nonpushable-insert-select-returning s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500088 +(1 row) + +step s1-nonpushable-insert-select-returning: + INSERT INTO isolation_table SELECT 5 RETURNING *; + +step s2-commit: + COMMIT; + +step s1-nonpushable-insert-select-returning: <... completed> +id|value +--------------------------------------------------------------------- + 5| +(1 row) + +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500087|t | 0 + 57637|1500088|t | 1 + 57637|1500089|t | 0 + 57638|1500086|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| +(1 row) + + +starting permutation: s1-begin s1-select s2-begin s2-isolate-tenant s1-nonpushable-insert-select-conflict s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500094 +(1 row) + +step s1-nonpushable-insert-select-conflict: + INSERT INTO isolation_table SELECT 5 ON CONFLICT DO NOTHING; + +step s2-commit: + COMMIT; + +step s1-nonpushable-insert-select-conflict: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500093|t | 0 + 57637|1500094|t | 1 + 57637|1500095|t | 0 + 57638|1500092|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| +(1 row) + + starting permutation: s1-begin s1-select s2-begin s2-isolate-tenant s1-copy s2-commit s1-commit s2-print-cluster create_distributed_table --------------------------------------------------------------------- @@ -743,7 +1158,7 @@ step s2-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500064 + 1500100 (1 row) step s1-copy: @@ -753,7 +1168,6 @@ step s2-commit: COMMIT; step s1-copy: <... completed> -ERROR: could not find valid entry for shard xxxxx step s1-commit: COMMIT; @@ -770,15 +1184,20 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500063|t | 0 - 57637|1500064|t | 0 - 57637|1500065|t | 0 - 57638|1500062|t | 0 + 57637|1500099|t | 1 + 57637|1500100|t | 1 + 57637|1500101|t | 2 + 57638|1500098|t | 1 (4 rows) id|value --------------------------------------------------------------------- -(0 rows) + 1| 1 + 2| 2 + 3| 3 + 4| 4 + 5| 5 +(5 rows) starting permutation: s1-begin s1-select s2-begin s2-isolate-tenant s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count @@ -809,7 +1228,7 @@ step s2-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500070 + 1500106 (1 row) step s1-ddl: @@ -835,10 +1254,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500069|t | 0 - 57637|1500070|t | 0 - 57637|1500071|t | 0 - 57638|1500068|t | 0 + 57637|1500105|t | 0 + 57637|1500106|t | 0 + 57637|1500107|t | 0 + 57638|1500104|t | 0 (4 rows) id|value @@ -862,6 +1281,439 @@ nodeport|success|result (4 rows) +starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-recursive s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + TRUNCATE isolation_table; + +step s1-insert: + INSERT INTO isolation_table VALUES (5, 10); + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500112 +(1 row) + +step s1-update-recursive: + UPDATE isolation_table SET value = 5 WHERE id IN ( + SELECT max(id) FROM isolation_table + ); + +step s2-commit: + COMMIT; + +step s1-update-recursive: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500111|t | 0 + 57637|1500112|t | 1 + 57637|1500113|t | 0 + 57638|1500110|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| 5 +(1 row) + + +starting permutation: s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-recursive s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + INSERT INTO isolation_table VALUES (5, 10); + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500118 +(1 row) + +step s1-update-recursive: + UPDATE isolation_table SET value = 5 WHERE id IN ( + SELECT max(id) FROM isolation_table + ); + +step s2-commit: + COMMIT; + +step s1-update-recursive: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500117|t | 0 + 57637|1500118|t | 1 + 57637|1500119|t | 0 + 57638|1500116|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| 5 +(1 row) + + +starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-cte s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + TRUNCATE isolation_table; + +step s1-insert: + INSERT INTO isolation_table VALUES (5, 10); + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500124 +(1 row) + +step s1-update-cte: + -- here update cte is subplan of the select plan. We do not replan a subplan if a shard is missing at CitusBeginModifyScan + WITH cte_1 AS (UPDATE isolation_table SET value = value RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; + +step s2-commit: + COMMIT; + +step s1-update-cte: <... completed> +ERROR: shard for the given value does not exist +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500123|t | 0 + 57637|1500124|t | 1 + 57637|1500125|t | 0 + 57638|1500122|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| 10 +(1 row) + + +starting permutation: s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-cte s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + INSERT INTO isolation_table VALUES (5, 10); + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500130 +(1 row) + +step s1-update-cte: + -- here update cte is subplan of the select plan. We do not replan a subplan if a shard is missing at CitusBeginModifyScan + WITH cte_1 AS (UPDATE isolation_table SET value = value RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; + +step s2-commit: + COMMIT; + +step s1-update-cte: <... completed> +ERROR: shard for the given value does not exist +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500129|t | 0 + 57637|1500130|t | 1 + 57637|1500131|t | 0 + 57638|1500128|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| 10 +(1 row) + + +starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-select-cte s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + TRUNCATE isolation_table; + +step s1-insert: + INSERT INTO isolation_table VALUES (5, 10); + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500136 +(1 row) + +step s1-update-select-cte: + -- here update plan is a top-level plan of the select cte. We can replan a top-level plan if a shard is missing at CitusBeginModifyScan + WITH cte_1 AS ( SELECT * FROM isolation_table ORDER BY 1,2 ) UPDATE isolation_table SET value = value WHERE EXISTS(SELECT * FROM cte_1); + +step s2-commit: + COMMIT; + +step s1-update-select-cte: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500135|t | 0 + 57637|1500136|t | 1 + 57637|1500137|t | 0 + 57638|1500134|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| 10 +(1 row) + + +starting permutation: s1-insert s1-begin s1-select s2-begin s2-isolate-tenant s1-update-select-cte s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + INSERT INTO isolation_table VALUES (5, 10); + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM isolation_table WHERE id = 5; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-isolate-tenant: + SELECT isolate_tenant_to_new_shard('isolation_table', 5, shard_transfer_mode => 'block_writes'); + +isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 1500142 +(1 row) + +step s1-update-select-cte: + -- here update plan is a top-level plan of the select cte. We can replan a top-level plan if a shard is missing at CitusBeginModifyScan + WITH cte_1 AS ( SELECT * FROM isolation_table ORDER BY 1,2 ) UPDATE isolation_table SET value = value WHERE EXISTS(SELECT * FROM cte_1); + +step s2-commit: + COMMIT; + +step s1-update-select-cte: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('isolation_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM isolation_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500141|t | 0 + 57637|1500142|t | 1 + 57637|1500143|t | 0 + 57638|1500140|t | 0 +(4 rows) + +id|value +--------------------------------------------------------------------- + 5| 10 +(1 row) + + starting permutation: s1-load-cache s1-insert s1-begin s1-isolate-tenant s2-isolate-tenant s1-commit s2-print-cluster create_distributed_table --------------------------------------------------------------------- @@ -885,7 +1737,7 @@ step s1-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500076 + 1500148 (1 row) step s2-isolate-tenant: @@ -908,10 +1760,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500073|t | 1 - 57638|1500075|t | 0 - 57638|1500076|t | 0 - 57638|1500077|t | 0 + 57637|1500145|t | 1 + 57638|1500147|t | 0 + 57638|1500148|t | 0 + 57638|1500149|t | 0 (4 rows) id|value @@ -940,7 +1792,7 @@ step s1-isolate-tenant: isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500082 + 1500154 (1 row) step s2-isolate-tenant: @@ -963,10 +1815,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500079|t | 1 - 57638|1500081|t | 0 - 57638|1500082|t | 0 - 57638|1500083|t | 0 + 57637|1500151|t | 1 + 57638|1500153|t | 0 + 57638|1500154|t | 0 + 57638|1500155|t | 0 (4 rows) id|value diff --git a/src/test/regress/expected/isolation_tenant_isolation_nonblocking.out b/src/test/regress/expected/isolation_tenant_isolation_nonblocking.out index 280e3183e..6638686ba 100644 --- a/src/test/regress/expected/isolation_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/isolation_tenant_isolation_nonblocking.out @@ -59,7 +59,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500090 + 1500162 (1 row) step s2-commit: @@ -78,10 +78,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500089|t | 0 - 57637|1500090|t | 1 - 57637|1500091|t | 0 - 57638|1500086|t | 0 + 57637|1500161|t | 0 + 57637|1500162|t | 1 + 57637|1500163|t | 0 + 57638|1500158|t | 0 (4 rows) id|value @@ -149,7 +149,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500098 + 1500170 (1 row) step s2-commit: @@ -168,10 +168,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500097|t | 0 - 57637|1500098|t | 0 - 57637|1500099|t | 0 - 57638|1500094|t | 0 + 57637|1500169|t | 0 + 57637|1500170|t | 0 + 57637|1500171|t | 0 + 57638|1500166|t | 0 (4 rows) id|value @@ -240,7 +240,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500106 + 1500178 (1 row) step s2-commit: @@ -259,10 +259,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500105|t | 0 - 57637|1500106|t | 1 - 57637|1500107|t | 0 - 57638|1500102|t | 0 + 57637|1500177|t | 0 + 57637|1500178|t | 1 + 57637|1500179|t | 0 + 57638|1500174|t | 0 (4 rows) id|value @@ -327,7 +327,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500114 + 1500186 (1 row) step s2-commit: @@ -346,10 +346,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500113|t | 0 - 57637|1500114|t | 1 - 57637|1500115|t | 0 - 57638|1500110|t | 0 + 57637|1500185|t | 0 + 57637|1500186|t | 1 + 57637|1500187|t | 0 + 57638|1500182|t | 0 (4 rows) id|value @@ -413,7 +413,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500122 + 1500194 (1 row) step s2-commit: @@ -432,10 +432,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500121|t | 1 - 57637|1500122|t | 1 - 57637|1500123|t | 2 - 57638|1500118|t | 1 + 57637|1500193|t | 1 + 57637|1500194|t | 1 + 57637|1500195|t | 2 + 57638|1500190|t | 1 (4 rows) id|value @@ -503,7 +503,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500130 + 1500202 (1 row) step s2-commit: @@ -522,10 +522,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500129|t | 0 - 57637|1500130|t | 1 - 57637|1500131|t | 0 - 57638|1500126|t | 0 + 57637|1500201|t | 0 + 57637|1500202|t | 1 + 57637|1500203|t | 0 + 57638|1500198|t | 0 (4 rows) id|value @@ -589,7 +589,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500138 + 1500210 (1 row) step s2-commit: @@ -608,10 +608,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500137|t | 0 - 57637|1500138|t | 0 - 57637|1500139|t | 0 - 57638|1500134|t | 0 + 57637|1500209|t | 0 + 57637|1500210|t | 0 + 57637|1500211|t | 0 + 57638|1500206|t | 0 (4 rows) id|value @@ -676,7 +676,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500146 + 1500218 (1 row) step s2-commit: @@ -695,10 +695,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500145|t | 0 - 57637|1500146|t | 1 - 57637|1500147|t | 0 - 57638|1500142|t | 0 + 57637|1500217|t | 0 + 57637|1500218|t | 1 + 57637|1500219|t | 0 + 57638|1500214|t | 0 (4 rows) id|value @@ -759,7 +759,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500154 + 1500226 (1 row) step s2-commit: @@ -778,10 +778,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500153|t | 0 - 57637|1500154|t | 1 - 57637|1500155|t | 0 - 57638|1500150|t | 0 + 57637|1500225|t | 0 + 57637|1500226|t | 1 + 57637|1500227|t | 0 + 57638|1500222|t | 0 (4 rows) id|value @@ -841,7 +841,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500162 + 1500234 (1 row) step s2-commit: @@ -860,10 +860,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500161|t | 1 - 57637|1500162|t | 1 - 57637|1500163|t | 2 - 57638|1500158|t | 1 + 57637|1500233|t | 1 + 57637|1500234|t | 1 + 57637|1500235|t | 2 + 57638|1500230|t | 1 (4 rows) id|value @@ -916,7 +916,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500170 + 1500242 (1 row) step s2-print-cluster: @@ -932,10 +932,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500169|t | 0 - 57637|1500170|t | 1 - 57637|1500171|t | 0 - 57638|1500166|t | 0 + 57637|1500241|t | 0 + 57637|1500242|t | 1 + 57637|1500243|t | 0 + 57638|1500238|t | 0 (4 rows) id|value @@ -984,7 +984,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500178 + 1500250 (1 row) step s2-print-cluster: @@ -1000,10 +1000,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500177|t | 0 - 57637|1500178|t | 1 - 57637|1500179|t | 0 - 57638|1500174|t | 0 + 57637|1500249|t | 0 + 57637|1500250|t | 1 + 57637|1500251|t | 0 + 57638|1500246|t | 0 (4 rows) id|value @@ -1052,7 +1052,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500186 + 1500258 (1 row) step s2-print-cluster: @@ -1068,10 +1068,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500185|t | 0 - 57637|1500186|t | 1 - 57637|1500187|t | 0 - 57638|1500182|t | 0 + 57637|1500257|t | 0 + 57637|1500258|t | 1 + 57637|1500259|t | 0 + 57638|1500254|t | 0 (4 rows) id|value @@ -1123,7 +1123,7 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500197 + 1500269 (1 row) step s2-commit: @@ -1142,10 +1142,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500196|t | 0 - 57637|1500197|t | 1 - 57637|1500198|t | 0 - 57638|1500193|t | 0 + 57637|1500268|t | 0 + 57637|1500269|t | 1 + 57637|1500270|t | 0 + 57638|1500265|t | 0 (4 rows) id|value @@ -1193,13 +1193,13 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500208 + 1500280 (1 row) step s1-isolate-tenant-no-same-coloc-blocking: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500211 + 1500283 (1 row) step s2-print-cluster: @@ -1215,10 +1215,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500207|t | 0 - 57637|1500208|t | 1 - 57637|1500209|t | 0 - 57638|1500204|t | 0 + 57637|1500279|t | 0 + 57637|1500280|t | 1 + 57637|1500281|t | 0 + 57638|1500276|t | 0 (4 rows) id|value @@ -1266,13 +1266,13 @@ t step s2-isolate-tenant: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500219 + 1500291 (1 row) step s1-isolate-tenant-no-same-coloc-blocking: <... completed> isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1500222 + 1500294 (1 row) step s2-print-cluster: @@ -1288,10 +1288,10 @@ step s2-print-cluster: nodeport|shardid|success|result --------------------------------------------------------------------- - 57637|1500218|t | 0 - 57637|1500219|t | 1 - 57637|1500220|t | 0 - 57638|1500215|t | 0 + 57637|1500290|t | 0 + 57637|1500291|t | 1 + 57637|1500292|t | 0 + 57638|1500287|t | 0 (4 rows) id|value diff --git a/src/test/regress/spec/isolation_tenant_isolation.spec b/src/test/regress/spec/isolation_tenant_isolation.spec index 5db86685e..17bf40477 100644 --- a/src/test/regress/spec/isolation_tenant_isolation.spec +++ b/src/test/regress/spec/isolation_tenant_isolation.spec @@ -39,18 +39,50 @@ step "s1-insert" INSERT INTO isolation_table VALUES (5, 10); } +step "s1-pushable-insert-select" +{ + INSERT INTO isolation_table SELECT * FROM isolation_table; +} + +step "s1-nonpushable-insert-select-with-copy" +{ + INSERT INTO isolation_table SELECT 5; +} + +step "s1-nonpushable-insert-select-returning" +{ + INSERT INTO isolation_table SELECT 5 RETURNING *; +} + +step "s1-nonpushable-insert-select-conflict" +{ + INSERT INTO isolation_table SELECT 5 ON CONFLICT DO NOTHING; +} + step "s1-update" { UPDATE isolation_table SET value = 5 WHERE id = 5; } -step "s1-update-complex" +step "s1-update-recursive" { UPDATE isolation_table SET value = 5 WHERE id IN ( SELECT max(id) FROM isolation_table ); } +step "s1-update-cte" +{ + -- here update cte is subplan of the select plan. We do not replan a subplan if a shard is missing at CitusBeginModifyScan + WITH cte_1 AS (UPDATE isolation_table SET value = value RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; +} + +step "s1-update-select-cte" +{ + -- here update plan is a top-level plan of the select cte. We can replan a top-level plan if a shard is missing at CitusBeginModifyScan + WITH cte_1 AS ( SELECT * FROM isolation_table ORDER BY 1,2 ) UPDATE isolation_table SET value = value WHERE EXISTS(SELECT * FROM cte_1); +} + step "s1-delete" { DELETE FROM isolation_table WHERE id = 5; @@ -123,23 +155,45 @@ step "s2-print-index-count" } // run tenant isolation while concurrently performing an DML and index creation -// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed +// we expect DML/DDL queries to be rerouted to valid shards even if the shard they are waiting for is destroyed permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" -permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-complex" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-pushable-insert-select" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-with-copy" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-returning" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-conflict" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" - // the same tests without loading the cache at first permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" -permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-complex" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-pushable-insert-select" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-with-copy" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-returning" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-nonpushable-insert-select-conflict" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" permutation "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" +// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-recursive" "s2-commit" "s1-commit" "s2-print-cluster" + +// the same tests without loading the cache at first +permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-recursive" "s2-commit" "s1-commit" "s2-print-cluster" + +// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-cte" "s2-commit" "s1-commit" "s2-print-cluster" + +// the same tests without loading the cache at first +permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-cte" "s2-commit" "s1-commit" "s2-print-cluster" + +// we expect DML/DDL queries to fail because the shard they are waiting for is destroyed +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-select-cte" "s2-commit" "s1-commit" "s2-print-cluster" + +// the same tests without loading the cache at first +permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-isolate-tenant" "s1-update-select-cte" "s2-commit" "s1-commit" "s2-print-cluster" // concurrent tenant isolation blocks on different shards of the same table (or any colocated table) permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-isolate-tenant" "s2-isolate-tenant" "s1-commit" "s2-print-cluster"