diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d40e198be..4e3791de1 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -31,6 +31,7 @@ #include "distributed/enterprise.h" #include "distributed/hash_helpers.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" @@ -139,11 +140,52 @@ typedef struct RebalanceContext FmgrInfo shardAllowedOnNodeUDF; } RebalanceContext; +/* WorkerHashKey contains hostname and port to be used as a key in a hash */ +typedef struct WorkerHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int port; +} WorkerHashKey; + +/* WorkerShardIds represents a set of shardIds grouped by worker */ +typedef struct WorkerShardIds +{ + WorkerHashKey worker; + + /* This is a uint64 hashset representing the shard ids for a specific worker */ + HTAB *shardIds; +} WorkerShardIds; + +/* ShardStatistics contains statistics about a shard */ +typedef struct ShardStatistics +{ + uint64 shardId; + + /* The shard its size in bytes. */ + uint64 totalSize; +} ShardStatistics; + +/* + * WorkerShardStatistics represents a set of statistics about shards, + * grouped by worker. + */ +typedef struct WorkerShardStatistics +{ + WorkerHashKey worker; + + /* + * Statistics for each shard on this worker: + * key: shardId + * value: ShardStatistics + */ + HTAB *statistics; +} WorkerShardStatistics; + /* static declarations for main logic */ static int ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId, List *activeWorkerNodeList); -static bool UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, +static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, List *responsiveNodeList, Oid shardReplicationModeOid); /* static declarations for main logic's utility functions */ @@ -192,6 +234,17 @@ static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name); static void EnsureShardCostUDF(Oid functionOid); static void EnsureNodeCapacityUDF(Oid functionOid); static void EnsureShardAllowedOnNodeUDF(Oid functionOid); +static void ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId); +static HTAB * BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps, + int stepCount); +static HTAB * GetShardStatistics(MultiConnection *connection, HTAB *shardIds); +static HTAB * GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, + int stepCount, bool fromSource); +static uint64 WorkerShardSize(HTAB *workerShardStatistics, + char *workerName, int workerPort, uint64 shardId); +static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort, + uint64 shardId); +static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(rebalance_table_shards); @@ -204,8 +257,17 @@ PG_FUNCTION_INFO_V1(citus_shard_cost_by_disk_size); PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions); PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); +bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +/* + * This is randomly generated hardcoded number. It's used as the first part of + * the advisory lock identifier that's used during isolation tests. See the + * comments on ConflictShardPlacementUpdateOnlyWithIsolationTesting, for more + * information. + */ +#define SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY 29279 + #ifdef USE_ASSERT_CHECKING @@ -464,8 +526,7 @@ NodeCapacity(WorkerNode *workerNode, void *voidContext) static ShardCost GetShardCost(uint64 shardId, void *voidContext) { - ShardCost shardCost; - memset_struct_0(shardCost); + ShardCost shardCost = { 0 }; shardCost.shardId = shardId; RebalanceContext *context = voidContext; Datum shardCostDatum = FunctionCall1(&context->shardCostUDF, UInt64GetDatum(shardId)); @@ -698,8 +759,6 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; - event->shardSize = ShardLength(colocatedUpdate->shardId); - eventIndex++; } } @@ -1015,19 +1074,35 @@ get_rebalance_progress(PG_FUNCTION_ARGS) List *rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER, &segmentList); + foreach(rebalanceMonitorCell, rebalanceMonitorList) { ProgressMonitorData *monitor = lfirst(rebalanceMonitorCell); PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; - + HTAB *shardStatistics = BuildWorkerShardStatisticsHash(monitor->steps, + monitor->stepCount); + HTAB *shardSizes = BuildShardSizesHash(monitor, shardStatistics); for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) { PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; uint64 shardId = step->shardId; ShardInterval *shardInterval = LoadShardInterval(shardId); - Datum values[9]; - bool nulls[9]; + uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); + uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + + uint64 shardSize = 0; + ShardStatistics *shardSizesStat = + hash_search(shardSizes, &shardId, HASH_FIND, NULL); + if (shardSizesStat) + { + shardSize = shardSizesStat->totalSize; + } + + Datum values[11]; + bool nulls[11]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -1035,12 +1110,14 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[0] = monitor->processId; values[1] = ObjectIdGetDatum(shardInterval->relationId); values[2] = UInt64GetDatum(shardId); - values[3] = UInt64GetDatum(step->shardSize); + values[3] = UInt64GetDatum(shardSize); values[4] = PointerGetDatum(cstring_to_text(step->sourceName)); values[5] = UInt32GetDatum(step->sourcePort); values[6] = PointerGetDatum(cstring_to_text(step->targetName)); values[7] = UInt32GetDatum(step->targetPort); values[8] = UInt64GetDatum(step->progress); + values[9] = UInt64GetDatum(sourceSize); + values[10] = UInt64GetDatum(targetSize); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } @@ -1054,6 +1131,348 @@ get_rebalance_progress(PG_FUNCTION_ARGS) } +/* + * BuildShardSizesHash creates a hash that maps a shardid to its full size + * within the cluster. It does this by using the rebalance progress monitor + * state to find the node the shard is currently on. It then looks up the shard + * size in the shardStatistics hashmap for this node. + */ +static HTAB * +BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) +{ + HASHCTL info = { + .keysize = sizeof(uint64), + .entrysize = sizeof(ShardStatistics), + .hcxt = CurrentMemoryContext + }; + + HTAB *shardSizes = hash_create( + "ShardSizeHash", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; + for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) + { + PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; + uint64 shardId = step->shardId; + uint64 shardSize = 0; + uint64 backupShardSize = 0; + + uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); + uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + + + if (step->progress == REBALANCE_PROGRESS_WAITING || + step->progress == REBALANCE_PROGRESS_MOVING) + { + /* + * If we are not done with the move, the correct shard size is the + * size on the source. + */ + shardSize = sourceSize; + backupShardSize = targetSize; + } + else if (step->progress == REBALANCE_PROGRESS_MOVED) + { + /* + * If we are done with the move, the correct shard size is the size + * on the target + */ + shardSize = targetSize; + backupShardSize = sourceSize; + } + + if (shardSize == 0) + { + if (backupShardSize == 0) + { + /* + * We don't have any useful shard size. This can happen when a + * shard is moved multiple times and it is not present on + * either of these nodes. Probably the shard is on a worker + * related to another event. In the weird case that this shard + * is on the nodes and actually is size 0, we will have no + * entry in the hashmap. When fetching from it we always + * default to 0 if no entry is found, so that's fine. + */ + continue; + } + + /* + * There exist some race conditions where it's possible that the + * the state of the steps we see in shared memory are a bit behind + * what is actually going on. So it is possible that even though a + * step is reported as still being in the MOVING state, the shard + * move might have just finished completing. This in turn can mean + * that the source size is 0 while the target size is not. We try + * to handle such rare edge cases by falling back on the other + * shard size if that one is not 0. + */ + shardSize = backupShardSize; + } + + + ShardStatistics *currentWorkerStatistics = + hash_search(shardSizes, &shardId, HASH_ENTER, NULL); + currentWorkerStatistics->totalSize = shardSize; + } + return shardSizes; +} + + +/* + * WorkerShardSize returns the size of a shard in bytes on a worker, based on + * the workerShardStatisticsHash. + */ +static uint64 +WorkerShardSize(HTAB *workerShardStatisticsHash, char *workerName, int workerPort, + uint64 shardId) +{ + WorkerHashKey workerKey = { 0 }; + strlcpy(workerKey.hostname, workerName, MAX_NODE_LENGTH); + workerKey.port = workerPort; + + WorkerShardStatistics *workerStats = + hash_search(workerShardStatisticsHash, &workerKey, HASH_FIND, NULL); + if (!workerStats) + { + return 0; + } + + ShardStatistics *shardStats = + hash_search(workerStats->statistics, &shardId, HASH_FIND, NULL); + if (!shardStats) + { + return 0; + } + return shardStats->totalSize; +} + + +/* + * BuildWorkerShardStatisticsHash returns a shard id -> shard statistics hash containing + * sizes of shards on the source node and destination node. + */ +static HTAB * +BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps, int stepCount) +{ + HTAB *shardsByWorker = GetMovedShardIdsByWorker(steps, stepCount, true); + + HASHCTL info = { + .keysize = sizeof(WorkerHashKey), + .entrysize = sizeof(WorkerShardStatistics), + .hcxt = CurrentMemoryContext + }; + + HTAB *workerShardStatistics = hash_create("WorkerShardStatistics", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + WorkerShardIds *entry = NULL; + + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardsByWorker); + while ((entry = hash_seq_search(&status)) != NULL) + { + int connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, + entry->worker.hostname, + entry->worker.port); + + HTAB *statistics = + GetShardStatistics(connection, entry->shardIds); + + WorkerHashKey workerKey = { 0 }; + strlcpy(workerKey.hostname, entry->worker.hostname, MAX_NODE_LENGTH); + workerKey.port = entry->worker.port; + + WorkerShardStatistics *moveStat = + hash_search(workerShardStatistics, &entry->worker, HASH_ENTER, NULL); + moveStat->statistics = statistics; + } + + return workerShardStatistics; +} + + +/* + * GetShardStatistics fetches the statics for the given shard ids over the + * given connection. It returns a hashmap where the keys are the shard ids and + * the values are the statistics. + */ +static HTAB * +GetShardStatistics(MultiConnection *connection, HTAB *shardIds) +{ + StringInfo query = makeStringInfo(); + + appendStringInfoString( + query, + "WITH shard_names (shard_id, schema_name, table_name) AS ((VALUES "); + + bool isFirst = true; + uint64 *shardIdPtr = NULL; + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardIds); + while ((shardIdPtr = hash_seq_search(&status)) != NULL) + { + uint64 shardId = *shardIdPtr; + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid relationId = shardInterval->relationId; + char *shardName = get_rel_name(relationId); + + AppendShardIdToName(&shardName, shardId); + + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + if (!isFirst) + { + appendStringInfo(query, ", "); + } + + appendStringInfo(query, "(" UINT64_FORMAT ",%s,%s)", + shardId, + quote_literal_cstr(schemaName), + quote_literal_cstr(shardName)); + + isFirst = false; + } + + appendStringInfoString(query, "))"); + appendStringInfoString( + query, + " SELECT shard_id, coalesce(pg_total_relation_size(tables.relid),0)" + + /* for each shard in shardIds */ + " FROM shard_names" + + /* check if its name can be found in pg_class, if so return size */ + " LEFT JOIN" + " (SELECT c.oid AS relid, c.relname, n.nspname" + " FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace) tables" + " ON tables.relname = shard_names.table_name AND" + " tables.nspname = shard_names.schema_name "); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(connection, query->data, &result); + if (queryResult != RESPONSE_OKAY) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot get the size because of a connection error"))); + } + + int rowCount = PQntuples(result); + int colCount = PQnfields(result); + + /* This is not expected to ever happen, but we check just to be sure */ + if (colCount < 2) + { + ereport(ERROR, (errmsg("unexpected number of columns returned by: %s", + query->data))); + } + + HASHCTL info = { + .keysize = sizeof(uint64), + .entrysize = sizeof(ShardStatistics), + .hcxt = CurrentMemoryContext + }; + + HTAB *shardStatistics = hash_create("ShardStatisticsHash", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + char *shardIdString = PQgetvalue(result, rowIndex, 0); + uint64 shardId = pg_strtouint64(shardIdString, NULL, 10); + char *sizeString = PQgetvalue(result, rowIndex, 1); + uint64 totalSize = pg_strtouint64(sizeString, NULL, 10); + + ShardStatistics *statistics = + hash_search(shardStatistics, &shardId, HASH_ENTER, NULL); + statistics->totalSize = totalSize; + } + + PQclear(result); + + bool raiseErrors = true; + ClearResults(connection, raiseErrors); + + return shardStatistics; +} + + +/* + * GetMovedShardIdsByWorker groups the shard ids in the provided steps by + * worker. It returns a hashmap that contains a set of these shard ids. + */ +static HTAB * +GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, + bool fromSource) +{ + HASHCTL info = { + .keysize = sizeof(WorkerHashKey), + .entrysize = sizeof(WorkerShardIds), + .hcxt = CurrentMemoryContext + }; + + HTAB *shardsByWorker = hash_create("GetRebalanceStepsByWorker", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + for (int stepIndex = 0; stepIndex < stepCount; stepIndex++) + { + PlacementUpdateEventProgress *step = &(steps[stepIndex]); + + AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, + step->shardId); + + if (step->progress == REBALANCE_PROGRESS_WAITING) + { + /* + * shard move has not started so we don't need target stats for + * this shard + */ + continue; + } + + AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort, + step->shardId); + } + + return shardsByWorker; +} + + +/* + * AddToWorkerShardIdSet adds the shard id to the shard id set for the + * specified worker in the shardsByWorker hashmap. + */ +static void +AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort, + uint64 shardId) +{ + WorkerHashKey workerKey = { 0 }; + + strlcpy(workerKey.hostname, workerName, MAX_NODE_LENGTH); + workerKey.port = workerPort; + + bool isFound = false; + WorkerShardIds *workerShardIds = + hash_search(shardsByWorker, &workerKey, HASH_ENTER, &isFound); + if (!isFound) + { + HASHCTL info = { + .keysize = sizeof(uint64), + .entrysize = sizeof(uint64), + .hcxt = CurrentMemoryContext + }; + + workerShardIds->shardIds = hash_create( + "WorkerShardIdsSet", 32, &info, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + } + + hash_search(workerShardIds->shardIds, &shardId, HASH_ENTER, NULL); +} + + /* * NonColocatedDistRelationIdList returns a list of distributed table oids, one * for each existing colocation group. @@ -1151,11 +1570,47 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) } +/* + * ConflictShardPlacementUpdateOnlyWithIsolationTesting is only useful for + * testing and should not be called by any code-path except for + * UpdateShardPlacement(). + * + * To be able to test the rebalance monitor functionality correctly, we need to + * be able to pause the rebalancer at a specific place in time. We cannot do + * this by block the shard move itself someway (e.g. by calling truncate on the + * distributed table). The reason for this is that we do the shard move in a + * newly opened connection. This causes our isolation tester block detection to + * not realise that the rebalance_table_shards call is blocked. + * + * So instead, before opening a connection we lock an advisory lock that's + * based on the shard id (shard id mod 1000). By locking this advisory lock in + * a different session we can block the rebalancer in a way that the isolation + * tester block detection is able to detect. + */ +static void +ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + if (RunningUnderIsolationTest) + { + /* we've picked a random lock */ + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY, + shardId % 1000, 2); + + (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); + } +} + + /* * UpdateShardPlacement copies or moves a shard placement by calling * the corresponding functions in Citus in a subtransaction. */ -static bool +static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, List *responsiveNodeList, Oid shardReplicationModeOid) { @@ -1177,13 +1632,9 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, targetNode->workerPort); if (!targetResponsive) { - ereport(WARNING, (errmsg("%s:%d is not responsive", targetNode->workerName, - targetNode->workerPort))); - UpdateColocatedShardPlacementProgress(shardId, - sourceNode->workerName, - sourceNode->workerPort, - REBALANCE_PROGRESS_ERROR); - return false; + ereport(ERROR, (errmsg("target node %s:%d is not responsive", + targetNode->workerName, + targetNode->workerPort))); } /* if source node is not responsive, don't continue */ @@ -1192,13 +1643,9 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, sourceNode->workerPort); if (!sourceResponsive) { - ereport(WARNING, (errmsg("%s:%d is not responsive", sourceNode->workerName, - sourceNode->workerPort))); - UpdateColocatedShardPlacementProgress(shardId, - sourceNode->workerName, - sourceNode->workerPort, - REBALANCE_PROGRESS_ERROR); - return false; + ereport(ERROR, (errmsg("source node %s:%d is not responsive", + sourceNode->workerName, + sourceNode->workerPort))); } if (updateType == PLACEMENT_UPDATE_MOVE) @@ -1235,6 +1682,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, sourceNode->workerPort, REBALANCE_PROGRESS_MOVING); + ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); int connectionFlag = FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, PostPortNumber); @@ -1250,8 +1698,6 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, sourceNode->workerPort, REBALANCE_PROGRESS_MOVED); CloseConnection(connection); - - return true; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f30e4e4ed..e2e1410f6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1499,6 +1499,19 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.running_under_isolation_test", + gettext_noop( + "Only useful for testing purposes, when set to true, Citus does some " + "tricks to implement useful isolation tests with rebalancing. Should " + "never be set to true on production systems "), + gettext_noop("for details of the tricks implemented, refer to the source code"), + &RunningUnderIsolationTest, + false, + PGC_SUSET, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.select_opens_transaction_block", gettext_noop("Open transaction blocks for SELECT commands"), diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index f6b385472..5c7d6b07c 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -22,3 +22,5 @@ ON CONFLICT DO NOTHING; ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ADD COLUMN improvement_threshold float4 NOT NULL default 0; UPDATE pg_catalog.pg_dist_rebalance_strategy SET improvement_threshold = 0.5 WHERE name = 'by_disk_size'; + +#include "udfs/get_rebalance_progress/10.1-1.sql" diff --git a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql index f873ac4b2..dccc66d16 100644 --- a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql +++ b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql @@ -83,24 +83,7 @@ DROP EXTENSION IF EXISTS shard_rebalancer; #include "udfs/get_rebalance_table_shards_plan/9.0-1.sql" #include "udfs/replicate_table_shards/9.0-1.sql" #include "udfs/rebalance_table_shards/9.0-1.sql" - --- get_rebalance_progress returns the list of shard placement move operations along with --- their progressions for ongoing rebalance operations. --- -CREATE OR REPLACE FUNCTION get_rebalance_progress() - RETURNS TABLE(sessionid integer, - table_name regclass, - shardid bigint, - shard_size bigint, - sourcename text, - sourceport int, - targetname text, - targetport int, - progress bigint) - AS 'MODULE_PATHNAME' - LANGUAGE C STRICT; -COMMENT ON FUNCTION get_rebalance_progress() - IS 'provides progress information about the ongoing rebalance operations'; +#include "udfs/get_rebalance_progress/9.0-1.sql" DROP FUNCTION master_add_node(text, integer, integer, noderole, name); CREATE FUNCTION master_add_node(nodename text, diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index c5a947553..c9fd0c895 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -36,3 +36,8 @@ DROP FUNCTION pg_catalog.citus_add_rebalance_strategy; #include "../udfs/citus_add_rebalance_strategy/9.2-1.sql" ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DROP COLUMN improvement_threshold; + +-- the migration for get_rebalance_progress from 9.0-1 was the first one, +-- so it doesn't have a DROP. This is why we DROP manually here. +DROP FUNCTION pg_catalog.get_rebalance_progress; +#include "../udfs/get_rebalance_progress/9.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql new file mode 100644 index 000000000..7df399ac1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/10.1-1.sql @@ -0,0 +1,18 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql new file mode 100644 index 000000000..00adeec1f --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/9.0-1.sql @@ -0,0 +1,17 @@ +-- get_rebalance_progress returns the list of shard placement move operations along with +-- their progressions for ongoing rebalance operations. +-- +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql new file mode 100644 index 000000000..7df399ac1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql @@ -0,0 +1,18 @@ +DROP FUNCTION pg_catalog.get_rebalance_progress(); + +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() + RETURNS TABLE(sessionid integer, + table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int, + progress bigint, + source_shard_size bigint, + target_shard_size bigint) + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; +COMMENT ON FUNCTION pg_catalog.get_rebalance_progress() + IS 'provides progress information about the ongoing rebalance operations'; diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 53c34cb21..ea1ff6827 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -105,7 +105,6 @@ typedef struct PlacementUpdateEventProgress int sourcePort; char targetName[255]; int targetPort; - uint64 shardSize; uint64 progress; } PlacementUpdateEventProgress; @@ -172,6 +171,7 @@ typedef struct RebalancePlanFunctions } RebalancePlanFunctions; extern int MaxRebalancerLoggedIgnoredMoves; +extern bool RunningUnderIsolationTest; /* External function declarations */ extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/isolation_shard_rebalancer.out b/src/test/regress/expected/isolation_shard_rebalancer.out index 4bb980230..2eab1d8b8 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer.out +++ b/src/test/regress/expected/isolation_shard_rebalancer.out @@ -286,7 +286,7 @@ rebalance_table_shards step s2-rebalance-all: select rebalance_table_shards(); -ERROR: could not acquire the lock required to rebalance public.distributed_transaction_id_table +ERROR: could not acquire the lock required to rebalance public.colocated1 step s1-commit: COMMIT; @@ -308,7 +308,7 @@ replicate_table_shards step s2-rebalance-all: select rebalance_table_shards(); -ERROR: could not acquire the lock required to rebalance public.distributed_transaction_id_table +ERROR: could not acquire the lock required to rebalance public.colocated1 step s1-commit: COMMIT; @@ -374,7 +374,7 @@ rebalance_table_shards step s2-drain: select master_drain_node('localhost', 57638); -ERROR: could not acquire the lock required to move public.distributed_transaction_id_table +ERROR: could not acquire the lock required to move public.colocated1 step s1-commit: COMMIT; @@ -396,7 +396,7 @@ replicate_table_shards step s2-drain: select master_drain_node('localhost', 57638); -ERROR: could not acquire the lock required to move public.distributed_transaction_id_table +ERROR: could not acquire the lock required to move public.colocated1 step s1-commit: COMMIT; diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out new file mode 100644 index 000000000..073a9422e --- /dev/null +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -0,0 +1,127 @@ +Parsed test spec with 3 sessions + +starting permutation: s2-lock-1 s2-lock-2 s1-rebalance-c1 s3-progress s2-unlock-1 s3-progress s2-unlock-2 s3-progress s1-commit s3-progress +master_set_node_property + + +step s2-lock-1: + SELECT pg_advisory_lock(29279, 1); + +pg_advisory_lock + + +step s2-lock-2: + SELECT pg_advisory_lock(29279, 2); + +pg_advisory_lock + + +step s1-rebalance-c1: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); + +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +colocated1 1500001 49152 localhost 57637 49152 localhost 57638 0 1 +colocated2 1500005 376832 localhost 57637 376832 localhost 57638 0 1 +colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 0 +colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 0 +step s2-unlock-1: + SELECT pg_advisory_unlock(29279, 1); + +pg_advisory_unlock + +t +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +colocated1 1500001 49152 localhost 57637 0 localhost 57638 49152 2 +colocated2 1500005 376832 localhost 57637 0 localhost 57638 376832 2 +colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 1 +colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 1 +step s2-unlock-2: + SELECT pg_advisory_unlock(29279, 2); + +pg_advisory_unlock + +t +step s1-rebalance-c1: <... completed> +table_name shardid shard_size sourcename sourceport targetname targetport + +colocated1 1500001 0 localhost 57637 localhost 57638 +colocated2 1500005 0 localhost 57637 localhost 57638 +colocated1 1500002 0 localhost 57637 localhost 57638 +colocated2 1500006 0 localhost 57637 localhost 57638 +rebalance_table_shards + + +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +step s1-commit: + COMMIT; + +step s3-progress: + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress + +restore_isolation_tester_func + + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5654eb248..06f54321d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -564,20 +564,22 @@ SELECT * FROM print_extension_changes(); -- Snapshot of state at 10.1-1 ALTER EXTENSION citus UPDATE TO '10.1-1'; SELECT * FROM print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real) void | function citus_internal.columnar_ensure_objects_exist() void | function create_distributed_table(regclass,text,citus.distribution_type,text) void | + function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void | function citus_local_disk_space_stats() record | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void + | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | function worker_partitioned_relation_size(regclass) bigint | function worker_partitioned_relation_total_size(regclass) bigint | function worker_partitioned_table_size(regclass) bigint -(11 rows) +(13 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index c6d68a023..e9d668387 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -181,6 +181,9 @@ RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.replication_model; -- Create a user to test multiuser usage of rebalancer functions +-- We explicitely don't create this user on worker nodes yet, so we can +-- test some more error handling. We create them later there. +SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. @@ -624,17 +627,48 @@ SELECT * FROM table_placements_per_node; -- Check that max_shard_moves limits number of move operations -- First check that we error if not table owner +-- Turn on NOTICE messages +SET ROLE testrole; +-- Make sure that rebalance is stopped if source or target nodes are +-- unresponsive. +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +ERROR: target node localhost:xxxxx is not responsive +\c - - - :worker_1_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; SET ROLE testrole; SELECT rebalance_table_shards('rebalance_test_table', - threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); -WARNING: localhost:xxxxx is not responsive - rebalance_table_shards ---------------------------------------------------------------------- - -(1 row) - +ERROR: source node localhost:xxxxx is not responsive +\c - - - :worker_2_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; +SET citus.next_shard_id TO 123010; +SET ROLE testrole; +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +ERROR: must be owner of table rebalance_test_table +CONTEXT: while executing command on localhost:xxxxx RESET ROLE; +-- Confirm no moves took place at all during these errors +SELECT * FROM table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | rebalance_test_table | 1 + 57638 | rebalance_test_table | 5 +(2 rows) + SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); @@ -1002,7 +1036,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala -- Check that we can call this function SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size --------------------------------------------------------------------- (0 rows) @@ -1015,7 +1049,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size --------------------------------------------------------------------- (0 rows) @@ -1314,8 +1348,6 @@ RESET search_path; DROP SCHEMA test_schema_support CASCADE; \set VERBOSITY default REVOKE ALL ON SCHEMA public FROM testrole; -ERROR: role "testrole" does not exist -CONTEXT: while executing command on localhost:xxxxx DROP USER testrole; -- Test costs set citus.shard_count = 4; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 127bafd01..208d7e228 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -73,6 +73,7 @@ test: isolation_blocking_move_single_shard_commands_on_mx test: isolation_blocking_move_multi_shard_commands_on_mx test: isolation_shard_rebalancer test: isolation_rebalancer_deferred_drop +test: isolation_shard_rebalancer_progress # MX tests test: isolation_reference_on_mx diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index c0d354640..a2c5bb252 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -499,6 +499,8 @@ if($isolationtester) push(@pgOptions, "citus.metadata_sync_interval=1000"); push(@pgOptions, "citus.metadata_sync_retry_interval=100"); push(@pgOptions, "client_min_messages='warning'"); # pg12 introduced notice showing during isolation tests + push(@pgOptions, "citus.running_under_isolation_test=true"); + } # Add externally added options last, so they overwrite the default ones above diff --git a/src/test/regress/spec/isolation_shard_rebalancer.spec b/src/test/regress/spec/isolation_shard_rebalancer.spec index ab3e0e6fe..1aca39ca6 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer.spec @@ -3,9 +3,9 @@ setup SELECT 1 FROM master_add_node('localhost', 57637); SELECT 1 FROM master_add_node('localhost', 57638); CREATE TABLE colocated1 (test_id integer NOT NULL, data text); - SELECT create_distributed_table('colocated1', 'test_id', 'hash'); + SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none'); CREATE TABLE colocated2 (test_id integer NOT NULL, data text); - SELECT create_distributed_table('colocated2', 'test_id', 'hash'); + SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1'); CREATE TABLE non_colocated (test_id integer NOT NULL, data text); SELECT create_distributed_table('non_colocated', 'test_id', 'hash', 'none'); } diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec new file mode 100644 index 000000000..d2248292d --- /dev/null +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -0,0 +1,84 @@ +setup +{ + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); + select setval('pg_dist_shardid_seq', GREATEST(1500000, nextval('pg_dist_shardid_seq'))); + SET citus.shard_count TO 4; + SET citus.shard_replication_factor TO 1; + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', false); + CREATE TABLE colocated1 (test_id integer NOT NULL, data text); + SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none'); + CREATE TABLE colocated2 (test_id integer NOT NULL, data text); + SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1'); + -- 1 and 3 are chosen so they go to shard 1 and 2 + INSERT INTO colocated1(test_id) SELECT 1 from generate_series(0, 1000) i; + INSERT INTO colocated2(test_id) SELECT 1 from generate_series(0, 10000) i; + INSERT INTO colocated1(test_id) SELECT 3 from generate_series(0, 5000) i; + select * from pg_dist_placement; + SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', true); +} + +teardown +{ + DROP TABLE colocated2; + DROP TABLE colocated1; + SELECT citus_internal.restore_isolation_tester_func(); +} + +session "s1" + +step "s1-rebalance-c1" +{ + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-lock-1" +{ + SELECT pg_advisory_lock(29279, 1); +} + +step "s2-lock-2" +{ + SELECT pg_advisory_lock(29279, 2); +} + +step "s2-unlock-1" +{ + SELECT pg_advisory_unlock(29279, 1); +} + +step "s2-unlock-2" +{ + SELECT pg_advisory_unlock(29279, 2); +} + +session "s3" + +step "s3-progress" +{ + set client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); +} + +permutation "s2-lock-1" "s2-lock-2" "s1-rebalance-c1" "s3-progress" "s2-unlock-1" "s3-progress" "s2-unlock-2" "s3-progress" "s1-commit" "s3-progress" diff --git a/src/test/regress/spec/shared_connection_waits.spec b/src/test/regress/spec/shared_connection_waits.spec index 80a8cb9e3..e7cd7eaca 100644 --- a/src/test/regress/spec/shared_connection_waits.spec +++ b/src/test/regress/spec/shared_connection_waits.spec @@ -22,6 +22,7 @@ teardown SELECT set_max_shared_pool_size(100); DROP FUNCTION wake_up_connection_pool_waiters(); DROP FUNCTION set_max_shared_pool_size(int); + DROP TABLE test; } session "s1" diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index dd6110f68..27146caaa 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -80,6 +80,9 @@ RESET citus.shard_replication_factor; RESET citus.replication_model; -- Create a user to test multiuser usage of rebalancer functions +-- We explicitely don't create this user on worker nodes yet, so we can +-- test some more error handling. We create them later there. +SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; GRANT ALL ON SCHEMA public TO testrole; @@ -436,11 +439,34 @@ SELECT * FROM table_placements_per_node; -- Check that max_shard_moves limits number of move operations -- First check that we error if not table owner +-- Turn on NOTICE messages +SET ROLE testrole; +-- Make sure that rebalance is stopped if source or target nodes are +-- unresponsive. +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +\c - - - :worker_1_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; +SET ROLE testrole; +SELECT rebalance_table_shards('rebalance_test_table', + shard_transfer_mode:='block_writes'); +\c - - - :worker_2_port +SET citus.enable_create_role_propagation TO OFF; +CREATE USER testrole; +GRANT ALL ON SCHEMA public TO testrole; +\c - - - :master_port +SET client_min_messages TO WARNING; +SET citus.next_shard_id TO 123010; SET ROLE testrole; SELECT rebalance_table_shards('rebalance_test_table', - threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); RESET ROLE; +-- Confirm no moves took place at all during these errors +SELECT * FROM table_placements_per_node; SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1,