From 10f06ad7536b143446080d690e496195c3d2d727 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 23 Apr 2021 12:36:20 +0200 Subject: [PATCH] Fetch shard size on the fly for the rebalance monitor Without this change the rebalancer progress monitor gets the shard sizes from the `shardlength` column in `pg_dist_placement`. This column needs to be updated manually by calling `citus_update_table_statistics`. However, `citus_update_table_statistics` could lead to distributed deadlocks while database traffic is on-going (see #4752). To work around this we don't use `shardlength` column anymore. Instead for every rebalance we now fetch all shard sizes on the fly. Two additional things this does are: 1. It adds tests for the rebalance progress function. 2. If a shard move cannot be done because a source or target node is unreachable, then we error in stop the rebalance, instead of showing a warning and continuing. When using the by_disk_size rebalance strategy it's not safe to continue with other moves if a specific move failed. It's possible that the failed move made space for the next move, and because the failed move never happened this space now does not exist. 3. Adds two new columns to the result of `get_rebalancer_progress` which shows the size of the shard on the source and target node. Fixes #4930 --- .../distributed/operations/shard_rebalancer.c | 498 +++++++++++++++++- src/backend/distributed/shared_library_init.c | 13 + .../distributed/sql/citus--10.0-3--10.1-1.sql | 2 + .../distributed/sql/citus--8.3-1--9.0-1.sql | 19 +- .../sql/downgrades/citus--10.1-1--10.0-3.sql | 5 + .../udfs/get_rebalance_progress/10.1-1.sql | 18 + .../sql/udfs/get_rebalance_progress/9.0-1.sql | 17 + .../udfs/get_rebalance_progress/latest.sql | 18 + src/include/distributed/shard_rebalancer.h | 2 +- .../expected/isolation_shard_rebalancer.out | 8 +- .../isolation_shard_rebalancer_progress.out | 127 +++++ src/test/regress/expected/multi_extension.out | 6 +- .../regress/expected/shard_rebalancer.out | 54 +- src/test/regress/isolation_schedule | 1 + src/test/regress/pg_regress_multi.pl | 2 + .../spec/isolation_shard_rebalancer.spec | 4 +- .../isolation_shard_rebalancer_progress.spec | 84 +++ .../regress/spec/shared_connection_waits.spec | 1 + src/test/regress/sql/shard_rebalancer.sql | 28 +- 19 files changed, 842 insertions(+), 65 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql create mode 100644 src/test/regress/expected/isolation_shard_rebalancer_progress.out create mode 100644 src/test/regress/spec/isolation_shard_rebalancer_progress.spec diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d40e198be..4e3791de1 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -31,6 +31,7 @@ #include "distributed/enterprise.h" #include "distributed/hash_helpers.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" @@ -139,11 +140,52 @@ typedef struct RebalanceContext FmgrInfo shardAllowedOnNodeUDF; } RebalanceContext; +/* WorkerHashKey contains hostname and port to be used as a key in a hash */ +typedef struct WorkerHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int port; +} WorkerHashKey; + +/* WorkerShardIds represents a set of shardIds grouped by worker */ +typedef struct WorkerShardIds +{ + WorkerHashKey worker; + + /* This is a uint64 hashset representing the shard ids for a specific worker */ + HTAB *shardIds; +} WorkerShardIds; + +/* ShardStatistics contains statistics about a shard */ +typedef struct ShardStatistics +{ + uint64 shardId; + + /* The shard its size in bytes. */ + uint64 totalSize; +} ShardStatistics; + +/* + * WorkerShardStatistics represents a set of statistics about shards, + * grouped by worker. + */ +typedef struct WorkerShardStatistics +{ + WorkerHashKey worker; + + /* + * Statistics for each shard on this worker: + * key: shardId + * value: ShardStatistics + */ + HTAB *statistics; +} WorkerShardStatistics; + /* static declarations for main logic */ static int ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId, List *activeWorkerNodeList); -static bool UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, +static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, List *responsiveNodeList, Oid shardReplicationModeOid); /* static declarations for main logic's utility functions */ @@ -192,6 +234,17 @@ static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name); static void EnsureShardCostUDF(Oid functionOid); static void EnsureNodeCapacityUDF(Oid functionOid); static void EnsureShardAllowedOnNodeUDF(Oid functionOid); +static void ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId); +static HTAB * BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps, + int stepCount); +static HTAB * GetShardStatistics(MultiConnection *connection, HTAB *shardIds); +static HTAB * GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, + int stepCount, bool fromSource); +static uint64 WorkerShardSize(HTAB *workerShardStatistics, + char *workerName, int workerPort, uint64 shardId); +static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort, + uint64 shardId); +static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(rebalance_table_shards); @@ -204,8 +257,17 @@ PG_FUNCTION_INFO_V1(citus_shard_cost_by_disk_size); PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions); PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); +bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +/* + * This is randomly generated hardcoded number. It's used as the first part of + * the advisory lock identifier that's used during isolation tests. See the + * comments on ConflictShardPlacementUpdateOnlyWithIsolationTesting, for more + * information. + */ +#define SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY 29279 + #ifdef USE_ASSERT_CHECKING @@ -464,8 +526,7 @@ NodeCapacity(WorkerNode *workerNode, void *voidContext) static ShardCost GetShardCost(uint64 shardId, void *voidContext) { - ShardCost shardCost; - memset_struct_0(shardCost); + ShardCost shardCost = { 0 }; shardCost.shardId = shardId; RebalanceContext *context = voidContext; Datum shardCostDatum = FunctionCall1(&context->shardCostUDF, UInt64GetDatum(shardId)); @@ -698,8 +759,6 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; - event->shardSize = ShardLength(colocatedUpdate->shardId); - eventIndex++; } } @@ -1015,19 +1074,35 @@ get_rebalance_progress(PG_FUNCTION_ARGS) List *rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER, &segmentList); + foreach(rebalanceMonitorCell, rebalanceMonitorList) { ProgressMonitorData *monitor = lfirst(rebalanceMonitorCell); PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; - + HTAB *shardStatistics = BuildWorkerShardStatisticsHash(monitor->steps, + monitor->stepCount); + HTAB *shardSizes = BuildShardSizesHash(monitor, shardStatistics); for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) { PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; uint64 shardId = step->shardId; ShardInterval *shardInterval = LoadShardInterval(shardId); - Datum values[9]; - bool nulls[9]; + uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); + uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + + uint64 shardSize = 0; + ShardStatistics *shardSizesStat = + hash_search(shardSizes, &shardId, HASH_FIND, NULL); + if (shardSizesStat) + { + shardSize = shardSizesStat->totalSize; + } + + Datum values[11]; + bool nulls[11]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -1035,12 +1110,14 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[0] = monitor->processId; values[1] = ObjectIdGetDatum(shardInterval->relationId); values[2] = UInt64GetDatum(shardId); - values[3] = UInt64GetDatum(step->shardSize); + values[3] = UInt64GetDatum(shardSize); values[4] = PointerGetDatum(cstring_to_text(step->sourceName)); values[5] = UInt32GetDatum(step->sourcePort); values[6] = PointerGetDatum(cstring_to_text(step->targetName)); values[7] = UInt32GetDatum(step->targetPort); values[8] = UInt64GetDatum(step->progress); + values[9] = UInt64GetDatum(sourceSize); + values[10] = UInt64GetDatum(targetSize); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } @@ -1054,6 +1131,348 @@ get_rebalance_progress(PG_FUNCTION_ARGS) } +/* + * BuildShardSizesHash creates a hash that maps a shardid to its full size + * within the cluster. It does this by using the rebalance progress monitor + * state to find the node the shard is currently on. It then looks up the shard + * size in the shardStatistics hashmap for this node. + */ +static HTAB * +BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) +{ + HASHCTL info = { + .keysize = sizeof(uint64), + .entrysize = sizeof(ShardStatistics), + .hcxt = CurrentMemoryContext + }; + + HTAB *shardSizes = hash_create( + "ShardSizeHash", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; + for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) + { + PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; + uint64 shardId = step->shardId; + uint64 shardSize = 0; + uint64 backupShardSize = 0; + + uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); + uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + + + if (step->progress == REBALANCE_PROGRESS_WAITING || + step->progress == REBALANCE_PROGRESS_MOVING) + { + /* + * If we are not done with the move, the correct shard size is the + * size on the source. + */ + shardSize = sourceSize; + backupShardSize = targetSize; + } + else if (step->progress == REBALANCE_PROGRESS_MOVED) + { + /* + * If we are done with the move, the correct shard size is the size + * on the target + */ + shardSize = targetSize; + backupShardSize = sourceSize; + } + + if (shardSize == 0) + { + if (backupShardSize == 0) + { + /* + * We don't have any useful shard size. This can happen when a + * shard is moved multiple times and it is not present on + * either of these nodes. Probably the shard is on a worker + * related to another event. In the weird case that this shard + * is on the nodes and actually is size 0, we will have no + * entry in the hashmap. When fetching from it we always + * default to 0 if no entry is found, so that's fine. + */ + continue; + } + + /* + * There exist some race conditions where it's possible that the + * the state of the steps we see in shared memory are a bit behind + * what is actually going on. So it is possible that even though a + * step is reported as still being in the MOVING state, the shard + * move might have just finished completing. This in turn can mean + * that the source size is 0 while the target size is not. We try + * to handle such rare edge cases by falling back on the other + * shard size if that one is not 0. + */ + shardSize = backupShardSize; + } + + + ShardStatistics *currentWorkerStatistics = + hash_search(shardSizes, &shardId, HASH_ENTER, NULL); + currentWorkerStatistics->totalSize = shardSize; + } + return shardSizes; +} + + +/* + * WorkerShardSize returns the size of a shard in bytes on a worker, based on + * the workerShardStatisticsHash. + */ +static uint64 +WorkerShardSize(HTAB *workerShardStatisticsHash, char *workerName, int workerPort, + uint64 shardId) +{ + WorkerHashKey workerKey = { 0 }; + strlcpy(workerKey.hostname, workerName, MAX_NODE_LENGTH); + workerKey.port = workerPort; + + WorkerShardStatistics *workerStats = + hash_search(workerShardStatisticsHash, &workerKey, HASH_FIND, NULL); + if (!workerStats) + { + return 0; + } + + ShardStatistics *shardStats = + hash_search(workerStats->statistics, &shardId, HASH_FIND, NULL); + if (!shardStats) + { + return 0; + } + return shardStats->totalSize; +} + + +/* + * BuildWorkerShardStatisticsHash returns a shard id -> shard statistics hash containing + * sizes of shards on the source node and destination node. + */ +static HTAB * +BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps, int stepCount) +{ + HTAB *shardsByWorker = GetMovedShardIdsByWorker(steps, stepCount, true); + + HASHCTL info = { + .keysize = sizeof(WorkerHashKey), + .entrysize = sizeof(WorkerShardStatistics), + .hcxt = CurrentMemoryContext + }; + + HTAB *workerShardStatistics = hash_create("WorkerShardStatistics", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + WorkerShardIds *entry = NULL; + + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardsByWorker); + while ((entry = hash_seq_search(&status)) != NULL) + { + int connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, + entry->worker.hostname, + entry->worker.port); + + HTAB *statistics = + GetShardStatistics(connection, entry->shardIds); + + WorkerHashKey workerKey = { 0 }; + strlcpy(workerKey.hostname, entry->worker.hostname, MAX_NODE_LENGTH); + workerKey.port = entry->worker.port; + + WorkerShardStatistics *moveStat = + hash_search(workerShardStatistics, &entry->worker, HASH_ENTER, NULL); + moveStat->statistics = statistics; + } + + return workerShardStatistics; +} + + +/* + * GetShardStatistics fetches the statics for the given shard ids over the + * given connection. It returns a hashmap where the keys are the shard ids and + * the values are the statistics. + */ +static HTAB * +GetShardStatistics(MultiConnection *connection, HTAB *shardIds) +{ + StringInfo query = makeStringInfo(); + + appendStringInfoString( + query, + "WITH shard_names (shard_id, schema_name, table_name) AS ((VALUES "); + + bool isFirst = true; + uint64 *shardIdPtr = NULL; + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardIds); + while ((shardIdPtr = hash_seq_search(&status)) != NULL) + { + uint64 shardId = *shardIdPtr; + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid relationId = shardInterval->relationId; + char *shardName = get_rel_name(relationId); + + AppendShardIdToName(&shardName, shardId); + + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + if (!isFirst) + { + appendStringInfo(query, ", "); + } + + appendStringInfo(query, "(" UINT64_FORMAT ",%s,%s)", + shardId, + quote_literal_cstr(schemaName), + quote_literal_cstr(shardName)); + + isFirst = false; + } + + appendStringInfoString(query, "))"); + appendStringInfoString( + query, + " SELECT shard_id, coalesce(pg_total_relation_size(tables.relid),0)" + + /* for each shard in shardIds */ + " FROM shard_names" + + /* check if its name can be found in pg_class, if so return size */ + " LEFT JOIN" + " (SELECT c.oid AS relid, c.relname, n.nspname" + " FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace) tables" + " ON tables.relname = shard_names.table_name AND" + " tables.nspname = shard_names.schema_name "); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(connection, query->data, &result); + if (queryResult != RESPONSE_OKAY) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot get the size because of a connection error"))); + } + + int rowCount = PQntuples(result); + int colCount = PQnfields(result); + + /* This is not expected to ever happen, but we check just to be sure */ + if (colCount < 2) + { + ereport(ERROR, (errmsg("unexpected number of columns returned by: %s", + query->data))); + } + + HASHCTL info = { + .keysize = sizeof(uint64), + .entrysize = sizeof(ShardStatistics), + .hcxt = CurrentMemoryContext + }; + + HTAB *shardStatistics = hash_create("ShardStatisticsHash", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + char *shardIdString = PQgetvalue(result, rowIndex, 0); + uint64 shardId = pg_strtouint64(shardIdString, NULL, 10); + char *sizeString = PQgetvalue(result, rowIndex, 1); + uint64 totalSize = pg_strtouint64(sizeString, NULL, 10); + + ShardStatistics *statistics = + hash_search(shardStatistics, &shardId, HASH_ENTER, NULL); + statistics->totalSize = totalSize; + } + + PQclear(result); + + bool raiseErrors = true; + ClearResults(connection, raiseErrors); + + return shardStatistics; +} + + +/* + * GetMovedShardIdsByWorker groups the shard ids in the provided steps by + * worker. It returns a hashmap that contains a set of these shard ids. + */ +static HTAB * +GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, + bool fromSource) +{ + HASHCTL info = { + .keysize = sizeof(WorkerHashKey), + .entrysize = sizeof(WorkerShardIds), + .hcxt = CurrentMemoryContext + }; + + HTAB *shardsByWorker = hash_create("GetRebalanceStepsByWorker", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + for (int stepIndex = 0; stepIndex < stepCount; stepIndex++) + { + PlacementUpdateEventProgress *step = &(steps[stepIndex]); + + AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, + step->shardId); + + if (step->progress == REBALANCE_PROGRESS_WAITING) + { + /* + * shard move has not started so we don't need target stats for + * this shard + */ + continue; + } + + AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort, + step->shardId); + } + + return shardsByWorker; +} + + +/* + * AddToWorkerShardIdSet adds the shard id to the shard id set for the + * specified worker in the shardsByWorker hashmap. + */ +static void +AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort, + uint64 shardId) +{ + WorkerHashKey workerKey = { 0 }; + + strlcpy(workerKey.hostname, workerName, MAX_NODE_LENGTH); + workerKey.port = workerPort; + + bool isFound = false; + WorkerShardIds *workerShardIds = + hash_search(shardsByWorker, &workerKey, HASH_ENTER, &isFound); + if (!isFound) + { + HASHCTL info = { + .keysize = sizeof(uint64), + .entrysize = sizeof(uint64), + .hcxt = CurrentMemoryContext + }; + + workerShardIds->shardIds = hash_create( + "WorkerShardIdsSet", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + } + + hash_search(workerShardIds->shardIds, &shardId, HASH_ENTER, NULL); +} + + /* * NonColocatedDistRelationIdList returns a list of distributed table oids, one * for each existing colocation group. @@ -1151,11 +1570,47 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) } +/* + * ConflictShardPlacementUpdateOnlyWithIsolationTesting is only useful for + * testing and should not be called by any code-path except for + * UpdateShardPlacement(). + * + * To be able to test the rebalance monitor functionality correctly, we need to + * be able to pause the rebalancer at a specific place in time. We cannot do + * this by block the shard move itself someway (e.g. by calling truncate on the + * distributed table). The reason for this is that we do the shard move in a + * newly opened connection. This causes our isolation tester block detection to + * not realise that the rebalance_table_shards call is blocked. + * + * So instead, before opening a connection we lock an advisory lock that's + * based on the shard id (shard id mod 1000). By locking this advisory lock in + * a different session we can block the rebalancer in a way that the isolation + * tester block detection is able to detect. + */ +static void +ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + if (RunningUnderIsolationTest) + { + /* we've picked a random lock */ + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY, + shardId % 1000, 2); + + (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); + } +} + + /* * UpdateShardPlacement copies or moves a shard placement by calling * the corresponding functions in Citus in a subtransaction. */ -static bool +static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, List *responsiveNodeList, Oid shardReplicationModeOid) { @@ -1177,13 +1632,9 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, targetNode->workerPort); if (!targetResponsive) { - ereport(WARNING, (errmsg("%s:%d is not responsive", targetNode->workerName, - targetNode->workerPort))); - UpdateColocatedShardPlacementProgress(shardId, - sourceNode->workerName, - sourceNode->workerPort, - REBALANCE_PROGRESS_ERROR); - return false; + ereport(ERROR, (errmsg("target node %s:%d is not responsive", + targetNode->workerName, + targetNode->workerPort))); } /* if source node is not responsive, don't continue */ @@ -1192,13 +1643,9 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, sourceNode->workerPort); if (!sourceResponsive) { - ereport(WARNING, (errmsg("%s:%d is not responsive", sourceNode->workerName, - sourceNode->workerPort))); - UpdateColocatedShardPlacementProgress(shardId, - sourceNode->workerName, - sourceNode->workerPort, - REBALANCE_PROGRESS_ERROR); - return false; + ereport(ERROR, (errmsg("source node %s:%d is not responsive", + sourceNode->workerName, + sourceNode->workerPort))); } if (updateType == PLACEMENT_UPDATE_MOVE) @@ -1235,6 +1682,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, sourceNode->workerPort, REBALANCE_PROGRESS_MOVING); + ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); int connectionFlag = FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, PostPortNumber); @@ -1250,8 +1698,6 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, sourceNode->workerPort, REBALANCE_PROGRESS_MOVED); CloseConnection(connection); - - return true; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f30e4e4ed..e2e1410f6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1499,6 +1499,19 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.running_under_isolation_test", + gettext_noop( + "Only useful for testing purposes, when set to true, Citus does some " + "tricks to implement useful isolation tests with rebalancing. Should " + "never be set to true on production systems "), + gettext_noop("for details of the tricks implemented, refer to the source code"), + &RunningUnderIsolationTest, + false, + PGC_SUSET, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.select_opens_transaction_block", gettext_noop("Open transaction blocks for SELECT commands"), diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index f6b385472..5c7d6b07c 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -22,3 +22,5 @@ ON CONFLICT DO NOTHING; ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ADD COLUMN improvement_threshold float4 NOT NULL default 0; UPDATE pg_catalog.pg_dist_rebalance_strategy SET improvement_threshold = 0.5 WHERE name = 'by_disk_size'; + +#include "udfs/get_rebalance_progress/10.1-1.sql" diff --git a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql index f873ac4b2..dccc66d16 100644 --- a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql +++ b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql @@ -83,24 +83,7 @@ DROP EXTENSION IF EXISTS shard_rebalancer; #include "udfs/get_rebalance_table_shards_plan/9.0-1.sql" #include "udfs/replicate_table_shards/9.0-1.sql" #include "udfs/rebalance_table_shards/9.0-1.sql" - --- get_rebalance_progress returns the list of shard placement move operations along with --- their progressions for ongoing rebalance operations. --- -CREATE OR REPLACE FUNCTION get_rebalance_progress() - RETURNS TABLE(sessionid integer, - table_name regclass, - shardid bigint, - shard_size bigint, - sourcename text, - sourceport int, - targetname text, - targetport int, - progress bigint) - AS 'MODULE_PATHNAME' - LANGUAGE C STRICT; -COMMENT ON FUNCTION get_rebalance_progress() - IS 'provides progress information about the ongoing rebalance operations'; +#include "udfs/get_rebalance_progress/9.0-1.sql" DROP FUNCTION master_add_node(text, integer, integer, noderole, name); CREATE FUNCTION master_add_node(nodename text, diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index c5a947553..c9fd0c895 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -36,3 +36,8 @@ DROP FUNCTION pg_catalog.citus_add_rebalance_strategy; #include "../udfs/citus_add_rebalance_strategy/9.2-1.sql" ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DROP COLUMN improvement_threshold; + +-- the migration for get_rebalance_progress from 9.0-1 was the first one, +-- so it doesn't have a DROP. This is why we DROP manually here. +DROP FUNCTION pg_catalog.get_rebalance_progress; +#include "../udfs/get_rebalance_progress/9.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql new file mode 100644 index 000000000..7df399ac1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql @@ -0,0 +1,18 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql new file mode 100644 index 000000000..00adeec1f --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql @@ -0,0 +1,17 @@ +-- get_rebalance_progress returns the list of shard placement move operations along with +-- their progressions for ongoing rebalance operations. +-- +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql new file mode 100644 index 000000000..7df399ac1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql @@ -0,0 +1,18 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 53c34cb21..ea1ff6827 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -105,7 +105,6 @@ typedef struct PlacementUpdateEventProgress int sourcePort; char targetName[255]; int targetPort; - uint64 shardSize; uint64 progress; } PlacementUpdateEventProgress; @@ -172,6 +171,7 @@ typedef struct RebalancePlanFunctions } RebalancePlanFunctions; extern int MaxRebalancerLoggedIgnoredMoves; +extern bool RunningUnderIsolationTest; /* External function declarations */ extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/isolation_shard_rebalancer.out b/src/test/regress/expected/isolation_shard_rebalancer.out index 4bb980230..2eab1d8b8 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer.out +++ b/src/test/regress/expected/isolation_shard_rebalancer.out @@ -286,7 +286,7 @@ rebalance_table_shards step s2-rebalance-all: select rebalance_table_shards(); -ERROR: could not acquire the lock required to rebalance public.distributed_transaction_id_table +ERROR: could not acquire the lock required to rebalance public.colocated1 step s1-commit: COMMIT; @@ -308,7 +308,7 @@ replicate_table_shards step s2-rebalance-all: select rebalance_table_shards(); -ERROR: could not acquire the lock required to rebalance public.distributed_transaction_id_table +ERROR: could not acquire the lock required to rebalance public.colocated1 step s1-commit: COMMIT; @@ -374,7 +374,7 @@ rebalance_table_shards step s2-drain: select master_drain_node('localhost', 57638); -ERROR: could not acquire the lock required to move public.distributed_transaction_id_table +ERROR: could not acquire the lock required to move public.colocated1 step s1-commit: COMMIT; @@ -396,7 +396,7 @@ replicate_table_shards step s2-drain: select master_drain_node('localhost', 57638); -ERROR: could not acquire the lock required to move public.distributed_transaction_id_table +ERROR: could not acquire the lock required to move public.colocated1 step s1-commit: COMMIT; diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out new file mode 100644 index 000000000..073a9422e --- /dev/null +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -0,0 +1,127 @@ +Parsed test spec with 3 sessions + +starting permutation: s2-lock-1 s2-lock-2 s1-rebalance-c1 s3-progress s2-unlock-1 s3-progress s2-unlock-2 s3-progress s1-commit s3-progress +master_set_node_property + + +step s2-lock-1: + SELECT pg_advisory_lock(29279, 1); + +pg_advisory_lock + + +step s2-lock-2: + SELECT pg_advisory_lock(29279, 2); + +pg_advisory_lock + + +step s1-rebalance-c1: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); + +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +colocated1 1500001 49152 localhost 57637 49152 localhost 57638 0 1 +colocated2 1500005 376832 localhost 57637 376832 localhost 57638 0 1 +colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 0 +colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 0 +step s2-unlock-1: + SELECT pg_advisory_unlock(29279, 1); + +pg_advisory_unlock + +t +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +colocated1 1500001 49152 localhost 57637 0 localhost 57638 49152 2 +colocated2 1500005 376832 localhost 57637 0 localhost 57638 376832 2 +colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 1 +colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 1 +step s2-unlock-2: + SELECT pg_advisory_unlock(29279, 2); + +pg_advisory_unlock + +t +step s1-rebalance-c1: <... completed> +table_name shardid shard_size sourcename sourceport targetname targetport + +colocated1 1500001 0 localhost 57637 localhost 57638 +colocated2 1500005 0 localhost 57637 localhost 57638 +colocated1 1500002 0 localhost 57637 localhost 57638 +colocated2 1500006 0 localhost 57637 localhost 57638 +rebalance_table_shards + + +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +step s1-commit: + COMMIT; + +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +restore_isolation_tester_func + + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5654eb248..06f54321d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -564,20 +564,22 @@ SELECT * FROM print_extension_changes(); -- Snapshot of state at 10.1-1 ALTER EXTENSION citus UPDATE TO '10.1-1'; SELECT * FROM print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real) void | function citus_internal.columnar_ensure_objects_exist() void | function create_distributed_table(regclass,text,citus.distribution_type,text) void | + function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void | function citus_local_disk_space_stats() record | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void + | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | function worker_partitioned_relation_size(regclass) bigint | function worker_partitioned_relation_total_size(regclass) bigint | function worker_partitioned_table_size(regclass) bigint -(11 rows) +(13 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index c6d68a023..e9d668387 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -181,6 +181,9 @@ RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.replication_model; -- Create a user to test multiuser usage of rebalancer functions +-- We explicitely don't create this user on worker nodes yet, so we can +-- test some more error handling. We create them later there. +SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. @@ -624,17 +627,48 @@ SELECT * FROM table_placements_per_node; -- Check that max_shard_moves limits number of move operations -- First check that we error if not table owner +-- Turn on NOTICE messages +SET ROLE testrole; +-- Make sure that rebalance is stopped if source or target nodes are +-- unresponsive. +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +ERROR: target node localhost:xxxxx is not responsive +\c - - - :worker_1_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; SET ROLE testrole; SELECT rebalance_table_shards('rebalance_test_table', - threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); -WARNING: localhost:xxxxx is not responsive - rebalance_table_shards ---------------------------------------------------------------------- - -(1 row) - +ERROR: source node localhost:xxxxx is not responsive +\c - - - :worker_2_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; +SET citus.next_shard_id TO 123010; +SET ROLE testrole; +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +ERROR: must be owner of table rebalance_test_table +CONTEXT: while executing command on localhost:xxxxx RESET ROLE; +-- Confirm no moves took place at all during these errors +SELECT * FROM table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | rebalance_test_table | 1 + 57638 | rebalance_test_table | 5 +(2 rows) + SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); @@ -1002,7 +1036,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala -- Check that we can call this function SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size --------------------------------------------------------------------- (0 rows) @@ -1015,7 +1049,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size --------------------------------------------------------------------- (0 rows) @@ -1314,8 +1348,6 @@ RESET search_path; DROP SCHEMA test_schema_support CASCADE; \set VERBOSITY default REVOKE ALL ON SCHEMA public FROM testrole; -ERROR: role "testrole" does not exist -CONTEXT: while executing command on localhost:xxxxx DROP USER testrole; -- Test costs set citus.shard_count = 4; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 127bafd01..208d7e228 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -73,6 +73,7 @@ test: isolation_blocking_move_single_shard_commands_on_mx test: isolation_blocking_move_multi_shard_commands_on_mx test: isolation_shard_rebalancer test: isolation_rebalancer_deferred_drop +test: isolation_shard_rebalancer_progress # MX tests test: isolation_reference_on_mx diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index c0d354640..a2c5bb252 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -499,6 +499,8 @@ if($isolationtester) push(@pgOptions, "citus.metadata_sync_interval=1000"); push(@pgOptions, "citus.metadata_sync_retry_interval=100"); push(@pgOptions, "client_min_messages='warning'"); # pg12 introduced notice showing during isolation tests + push(@pgOptions, "citus.running_under_isolation_test=true"); + } # Add externally added options last, so they overwrite the default ones above diff --git a/src/test/regress/spec/isolation_shard_rebalancer.spec b/src/test/regress/spec/isolation_shard_rebalancer.spec index ab3e0e6fe..1aca39ca6 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer.spec @@ -3,9 +3,9 @@ setup SELECT 1 FROM master_add_node('localhost', 57637); SELECT 1 FROM master_add_node('localhost', 57638); CREATE TABLE colocated1 (test_id integer NOT NULL, data text); - SELECT create_distributed_table('colocated1', 'test_id', 'hash'); + SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none'); CREATE TABLE colocated2 (test_id integer NOT NULL, data text); - SELECT create_distributed_table('colocated2', 'test_id', 'hash'); + SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1'); CREATE TABLE non_colocated (test_id integer NOT NULL, data text); SELECT create_distributed_table('non_colocated', 'test_id', 'hash', 'none'); } diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec new file mode 100644 index 000000000..d2248292d --- /dev/null +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -0,0 +1,84 @@ +setup +{ + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); + select setval('pg_dist_shardid_seq', GREATEST(1500000, nextval('pg_dist_shardid_seq'))); + SET citus.shard_count TO 4; + SET citus.shard_replication_factor TO 1; + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', false); + CREATE TABLE colocated1 (test_id integer NOT NULL, data text); + SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none'); + CREATE TABLE colocated2 (test_id integer NOT NULL, data text); + SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1'); + -- 1 and 3 are chosen so they go to shard 1 and 2 + INSERT INTO colocated1(test_id) SELECT 1 from generate_series(0, 1000) i; + INSERT INTO colocated2(test_id) SELECT 1 from generate_series(0, 10000) i; + INSERT INTO colocated1(test_id) SELECT 3 from generate_series(0, 5000) i; + select * from pg_dist_placement; + SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', true); +} + +teardown +{ + DROP TABLE colocated2; + DROP TABLE colocated1; + SELECT citus_internal.restore_isolation_tester_func(); +} + +session "s1" + +step "s1-rebalance-c1" +{ + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-lock-1" +{ + SELECT pg_advisory_lock(29279, 1); +} + +step "s2-lock-2" +{ + SELECT pg_advisory_lock(29279, 2); +} + +step "s2-unlock-1" +{ + SELECT pg_advisory_unlock(29279, 1); +} + +step "s2-unlock-2" +{ + SELECT pg_advisory_unlock(29279, 2); +} + +session "s3" + +step "s3-progress" +{ + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); +} + +permutation "s2-lock-1" "s2-lock-2" "s1-rebalance-c1" "s3-progress" "s2-unlock-1" "s3-progress" "s2-unlock-2" "s3-progress" "s1-commit" "s3-progress" diff --git a/src/test/regress/spec/shared_connection_waits.spec b/src/test/regress/spec/shared_connection_waits.spec index 80a8cb9e3..e7cd7eaca 100644 --- a/src/test/regress/spec/shared_connection_waits.spec +++ b/src/test/regress/spec/shared_connection_waits.spec @@ -22,6 +22,7 @@ teardown SELECT set_max_shared_pool_size(100); DROP FUNCTION wake_up_connection_pool_waiters(); DROP FUNCTION set_max_shared_pool_size(int); + DROP TABLE test; } session "s1" diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index dd6110f68..27146caaa 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -80,6 +80,9 @@ RESET citus.shard_replication_factor; RESET citus.replication_model; -- Create a user to test multiuser usage of rebalancer functions +-- We explicitely don't create this user on worker nodes yet, so we can +-- test some more error handling. We create them later there. +SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; GRANT ALL ON SCHEMA public TO testrole; @@ -436,11 +439,34 @@ SELECT * FROM table_placements_per_node; -- Check that max_shard_moves limits number of move operations -- First check that we error if not table owner +-- Turn on NOTICE messages +SET ROLE testrole; +-- Make sure that rebalance is stopped if source or target nodes are +-- unresponsive. +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +\c - - - :worker_1_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; +SET ROLE testrole; +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +\c - - - :worker_2_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; +SET citus.next_shard_id TO 123010; SET ROLE testrole; SELECT rebalance_table_shards('rebalance_test_table', - threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); RESET ROLE; +-- Confirm no moves took place at all during these errors +SELECT * FROM table_placements_per_node; SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1,