diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 244d9aea8..6a10f8d4b 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -79,14 +79,24 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery, bool failOnError, uint64 *tableSize); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); -static char * GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList); -static char * GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode); -static List * GenerateShardSizesQueryList(List *workerNodeList); +static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool + useShardMinMaxQuery); +static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, + List *citusTableIds, bool + useShardMinMaxQuery); +static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, + bool useShardMinMaxQuery); static void ErrorIfNotSuitableToGetSize(Oid relationId); static List * OpenConnectionToNodes(List *workerNodeList); static void ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); +static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId, + ShardInterval * + shardInterval, char *shardName, + char *quotedShardName); +static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval, + char *quotedShardName); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(citus_table_size); @@ -102,25 +112,16 @@ citus_shard_sizes(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *allCitusTableIds = AllCitusTableIds(); - List *shardSizesQueryList = GenerateShardSizesQueryList(workerNodeList); + /* we don't need a distributed transaction here */ + bool useDistributedTransaction = false; - List *connectionList = OpenConnectionToNodes(workerNodeList); - FinishConnectionListEstablishment(connectionList); - - - /* send commands in parallel */ - for (int i = 0; i < list_length(connectionList); i++) - { - MultiConnection *connection = (MultiConnection *) list_nth(connectionList, i); - char *shardSizesQuery = (char *) list_nth(shardSizesQueryList, i); - int querySent = SendRemoteCommand(connection, shardSizesQuery); - if (querySent == 0) - { - ReportConnectionError(connection, WARNING); - } - } + /* we only want the shard sizes here so useShardMinMaxQuery parameter is false */ + bool useShardMinMaxQuery = false; + List *connectionList = SendShardStatisticsQueriesInParallel(allCitusTableIds, + useDistributedTransaction, + useShardMinMaxQuery); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); @@ -225,6 +226,59 @@ citus_relation_size(PG_FUNCTION_ARGS) } +/* + * SendShardStatisticsQueriesInParallel generates query lists for obtaining shard + * statistics and then sends the commands in parallel by opening connections + * to available nodes. It returns the connection list. + */ +List * +SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction, + bool + useShardMinMaxQuery) +{ + List *workerNodeList = ActivePrimaryNodeList(NoLock); + + List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList, + citusTableIds, + useShardMinMaxQuery); + + List *connectionList = OpenConnectionToNodes(workerNodeList); + FinishConnectionListEstablishment(connectionList); + + if (useDistributedTransaction) + { + /* + * For now, in the case we want to include shard min and max values, we also + * want to update the entries in pg_dist_placement and pg_dist_shard with the + * latest statistics. In order to detect distributed deadlocks, we assign a + * distributed transaction ID to the current transaction + */ + UseCoordinatedTransaction(); + } + + /* send commands in parallel */ + for (int i = 0; i < list_length(connectionList); i++) + { + MultiConnection *connection = (MultiConnection *) list_nth(connectionList, i); + char *shardSizesQuery = (char *) list_nth(shardSizesQueryList, i); + + if (useDistributedTransaction) + { + /* run the size query in a distributed transaction */ + RemoteTransactionBeginIfNecessary(connection); + } + + int querySent = SendRemoteCommand(connection, shardSizesQuery); + + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + } + } + return connectionList; +} + + /* * OpenConnectionToNodes opens a single connection per node * for the given workerNodeList. @@ -250,20 +304,25 @@ OpenConnectionToNodes(List *workerNodeList) /* - * GenerateShardSizesQueryList generates a query per node that - * will return all shard_name, shard_size pairs from the node. + * GenerateShardStatisticsQueryList generates a query per node that will return: + * - all shard_name, shard_size pairs from the node (if includeShardMinMax is false) + * - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples from the node (if true) */ static List * -GenerateShardSizesQueryList(List *workerNodeList) +GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, bool + useShardMinMaxQuery) { - List *shardSizesQueryList = NIL; + List *shardStatisticsQueryList = NIL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - char *shardSizesQuery = GenerateAllShardNameAndSizeQueryForNode(workerNode); - shardSizesQueryList = lappend(shardSizesQueryList, shardSizesQuery); + char *shardStatisticsQuery = GenerateAllShardStatisticsQueryForNode(workerNode, + citusTableIds, + useShardMinMaxQuery); + shardStatisticsQueryList = lappend(shardStatisticsQueryList, + shardStatisticsQuery); } - return shardSizesQueryList; + return shardStatisticsQueryList; } @@ -572,37 +631,50 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery) /* - * GenerateAllShardNameAndSizeQueryForNode generates a query that returns all - * shard_name, shard_size pairs for the given node. + * GenerateAllShardStatisticsQueryForNode generates a query that returns: + * - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false) + * - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples (if true) */ static char * -GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode) +GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool + useShardMinMaxQuery) { - List *allCitusTableIds = AllCitusTableIds(); - - StringInfo allShardNameAndSizeQuery = makeStringInfo(); + StringInfo allShardStatisticsQuery = makeStringInfo(); Oid relationId = InvalidOid; - foreach_oid(relationId, allCitusTableIds) + foreach_oid(relationId, citusTableIds) { List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId); - char *shardNameAndSizeQuery = - GenerateShardNameAndSizeQueryForShardList(shardIntervalsOnNode); - appendStringInfoString(allShardNameAndSizeQuery, shardNameAndSizeQuery); + char *shardStatisticsQuery = + GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode, + useShardMinMaxQuery); + appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery); } /* Add a dummy entry so that UNION ALL doesn't complain */ - appendStringInfo(allShardNameAndSizeQuery, "SELECT NULL::text, 0::bigint;"); - return allShardNameAndSizeQuery->data; + if (useShardMinMaxQuery) + { + /* 0 for shard_id, NULL for min, NULL for text, 0 for shard_size */ + appendStringInfo(allShardStatisticsQuery, + "SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;"); + } + else + { + /* NULL for shard_name, 0 for shard_size */ + appendStringInfo(allShardStatisticsQuery, "SELECT NULL::text, 0::bigint;"); + } + return allShardStatisticsQuery->data; } /* - * GenerateShardNameAndSizeQueryForShardList generates a SELECT shard_name - shard_size query to get - * size of multiple tables. + * GenerateShardStatisticsQueryForShardList generates one of the two types of queries: + * - SELECT shard_name - shard_size (if useShardMinMaxQuery is false) + * - SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size (if true) */ static char * -GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList) +GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool + useShardMinMaxQuery) { StringInfo selectQuery = makeStringInfo(); @@ -618,8 +690,15 @@ GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList) char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *quotedShardName = quote_literal_cstr(shardQualifiedName); - appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName); - appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName); + if (useShardMinMaxQuery) + { + AppendShardSizeMinMaxQuery(selectQuery, shardId, shardInterval, shardName, + quotedShardName); + } + else + { + AppendShardSizeQuery(selectQuery, shardInterval, quotedShardName); + } appendStringInfo(selectQuery, " UNION ALL "); } @@ -627,6 +706,54 @@ GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList) } +/* + * AppendShardSizeMinMaxQuery appends a query in the following form to selectQuery + * SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size + */ +static void +AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId, + ShardInterval *shardInterval, char *shardName, + char *quotedShardName) +{ + if (IsCitusTableType(shardInterval->relationId, APPEND_DISTRIBUTED)) + { + /* fill in the partition column name */ + const uint32 unusedTableId = 1; + Var *partitionColumn = PartitionColumn(shardInterval->relationId, + unusedTableId); + char *partitionColumnName = get_attname(shardInterval->relationId, + partitionColumn->varattno, false); + appendStringInfo(selectQuery, + "SELECT " UINT64_FORMAT + " AS shard_id, min(%s)::text AS shard_minvalue, max(%s)::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size FROM %s ", + shardId, partitionColumnName, + partitionColumnName, + quotedShardName, shardName); + } + else + { + /* we don't need to update min/max for non-append distributed tables because they don't change */ + appendStringInfo(selectQuery, + "SELECT " UINT64_FORMAT + " AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size ", + shardId, quotedShardName); + } +} + + +/* + * AppendShardSizeQuery appends a query in the following form to selectQuery + * SELECT shard_name, shard_size + */ +static void +AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval, + char *quotedShardName) +{ + appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName); + appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName); +} + + /* * ErrorIfNotSuitableToGetSize determines whether the table is suitable to find * its' size with internal functions. diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index a9f0dcf13..0813da540 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -32,7 +32,9 @@ #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" #include "distributed/distributed_planner.h" +#include "distributed/foreign_key_relationship.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" #include "distributed/metadata_utility.h" @@ -65,12 +67,22 @@ static List * RelationShardListForShardCreate(ShardInterval *shardInterval); static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); +static void UpdateTableStatistics(Oid relationId); +static void ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList); +static void UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid + relationId, List *shardPlacementList, uint64 + shardSize, text *shardMinValue, + text *shardMaxValue); +static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, + text **shardMinValue, text **shardMaxValue, + uint64 *shardSize); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_empty_shard); PG_FUNCTION_INFO_V1(master_append_table_to_shard); PG_FUNCTION_INFO_V1(citus_update_shard_statistics); PG_FUNCTION_INFO_V1(master_update_shard_statistics); +PG_FUNCTION_INFO_V1(citus_update_table_statistics); /* @@ -361,6 +373,23 @@ citus_update_shard_statistics(PG_FUNCTION_ARGS) } +/* + * citus_update_table_statistics updates metadata (shard size and shard min/max + * values) of the shards of the given table + */ +Datum +citus_update_table_statistics(PG_FUNCTION_ARGS) +{ + Oid distributedTableId = PG_GETARG_OID(0); + + CheckCitusVersion(ERROR); + + UpdateTableStatistics(distributedTableId); + + PG_RETURN_VOID(); +} + + /* * master_update_shard_statistics is a wrapper function for old UDF name. */ @@ -782,7 +811,6 @@ UpdateShardStatistics(int64 shardId) { ShardInterval *shardInterval = LoadShardInterval(shardId); Oid relationId = shardInterval->relationId; - char storageType = shardInterval->storageType; bool statsOK = false; uint64 shardSize = 0; text *minValue = NULL; @@ -825,17 +853,166 @@ UpdateShardStatistics(int64 shardId) errdetail("Setting shard statistics to NULL"))); } - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); + UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, shardPlacementList, + shardSize, minValue, maxValue); + return shardSize; +} - /* update metadata for each shard placement we appended to */ + +/* + * UpdateTableStatistics updates metadata (shard size and shard min/max values) + * of the shards of the given table. Follows a similar logic to citus_shard_sizes function. + */ +static void +UpdateTableStatistics(Oid relationId) +{ + List *citusTableIds = NIL; + citusTableIds = lappend_oid(citusTableIds, relationId); + + /* we want to use a distributed transaction here to detect distributed deadlocks */ + bool useDistributedTransaction = true; + + /* we also want shard min/max values for append distributed tables */ + bool useShardMinMaxQuery = true; + + List *connectionList = SendShardStatisticsQueriesInParallel(citusTableIds, + useDistributedTransaction, + useShardMinMaxQuery); + + ReceiveAndUpdateShardsSizeAndMinMax(connectionList); +} + + +/* + * ReceiveAndUpdateShardsSizeAndMinMax receives shard id, size + * and min max results from the given connection list, and updates + * respective entries in pg_dist_placement and pg_dist_shard + */ +static void +ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) +{ + /* + * From the connection list, we will not get all the shards, but + * all the placements. We use a hash table to remember already visited shard ids + * since we update all the different placements of a shard id at once. + */ + HTAB *alreadyVisitedShardPlacements = CreateOidVisitedHashSet(); + + MultiConnection *connection = NULL; + foreach_ptr(connection, connectionList) + { + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + continue; + } + + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + continue; + } + + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + + /* Although it is not expected */ + if (colCount != UPDATE_SHARD_STATISTICS_COLUMN_COUNT) + { + ereport(WARNING, (errmsg("unexpected number of columns from " + "citus_update_table_statistics"))); + continue; + } + + for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + uint64 shardId = 0; + text *shardMinValue = NULL; + text *shardMaxValue = NULL; + uint64 shardSize = 0; + + if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardMinValue, + &shardMaxValue, &shardSize)) + { + /* this row has no valid shard statistics */ + continue; + } + + if (OidVisited(alreadyVisitedShardPlacements, shardId)) + { + /* We have already updated this placement list */ + continue; + } + + VisitOid(alreadyVisitedShardPlacements, shardId); + + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid relationId = shardInterval->relationId; + List *shardPlacementList = ActiveShardPlacementList(shardId); + + UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, + shardPlacementList, shardSize, shardMinValue, + shardMaxValue); + } + PQclear(result); + ForgetResults(connection); + } + hash_destroy(alreadyVisitedShardPlacements); +} + + +/* + * ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult + * - it returns true if this row belongs to a valid shard + * - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID) + */ +static bool +ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, + text **shardMinValue, text **shardMaxValue, uint64 *shardSize) +{ + *shardId = ParseIntField(result, rowIndex, 0); + + /* check for the dummy entries we put so that UNION ALL wouldn't complain */ + if (*shardId == INVALID_SHARD_ID) + { + /* this row has no valid shard statistics */ + return false; + } + + char *minValueResult = PQgetvalue(result, rowIndex, 1); + char *maxValueResult = PQgetvalue(result, rowIndex, 2); + *shardMinValue = cstring_to_text(minValueResult); + *shardMaxValue = cstring_to_text(maxValueResult); + *shardSize = ParseIntField(result, rowIndex, 3); + return true; +} + + +/* + * UpdateShardSizeAndMinMax updates the shardlength (shard size) of the given + * shard and its placements in pg_dist_placement, and updates the shard min value + * and shard max value of the given shard in pg_dist_shard if the relationId belongs + * to an append-distributed table + */ +static void +UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relationId, + List *shardPlacementList, uint64 shardSize, text *shardMinValue, + text *shardMaxValue) +{ + char storageType = shardInterval->storageType; + + ShardPlacement *placement = NULL; + + /* update metadata for each shard placement */ foreach_ptr(placement, shardPlacementList) { uint64 placementId = placement->placementId; int32 groupId = placement->groupId; DeleteShardPlacementRow(placementId); - InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, shardSize, + InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, + shardSize, groupId); } @@ -843,18 +1020,9 @@ UpdateShardStatistics(int64 shardId) if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { DeleteShardRow(shardId); - InsertShardRow(relationId, shardId, storageType, minValue, maxValue); + InsertShardRow(relationId, shardId, storageType, shardMinValue, + shardMaxValue); } - - if (QueryCancelPending) - { - ereport(WARNING, (errmsg("cancel requests are ignored during metadata update"))); - QueryCancelPending = false; - } - - RESUME_INTERRUPTS(); - - return shardSize; } diff --git a/src/backend/distributed/sql/citus--8.0-1.sql b/src/backend/distributed/sql/citus--8.0-1.sql index 5647e01fd..eced5cc60 100644 --- a/src/backend/distributed/sql/citus--8.0-1.sql +++ b/src/backend/distributed/sql/citus--8.0-1.sql @@ -1394,22 +1394,11 @@ COMMENT ON FUNCTION master_update_node(node_id int, new_node_name text, new_node -- shard statistics CREATE OR REPLACE FUNCTION master_update_table_statistics(relation regclass) -RETURNS VOID AS $$ -DECLARE - colocated_tables regclass[]; -BEGIN - SELECT get_colocated_table_array(relation) INTO colocated_tables; - - PERFORM - master_update_shard_statistics(shardid) - FROM - pg_dist_shard - WHERE - logicalrelid = ANY (colocated_tables); -END; -$$ LANGUAGE 'plpgsql'; -COMMENT ON FUNCTION master_update_table_statistics(regclass) - IS 'updates shard statistics of the given table and its colocated tables'; +RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_update_table_statistics$$; +COMMENT ON FUNCTION pg_catalog.master_update_table_statistics(regclass) + IS 'updates shard statistics of the given table'; CREATE OR REPLACE FUNCTION get_colocated_shard_array(bigint) RETURNS BIGINT[] diff --git a/src/backend/distributed/sql/udfs/citus_update_table_statistics/10.0-1.sql b/src/backend/distributed/sql/udfs/citus_update_table_statistics/10.0-1.sql index 13f4a680e..d4e07d24d 100644 --- a/src/backend/distributed/sql/udfs/citus_update_table_statistics/10.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_update_table_statistics/10.0-1.sql @@ -1,17 +1,6 @@ CREATE FUNCTION pg_catalog.citus_update_table_statistics(relation regclass) -RETURNS VOID AS $$ -DECLARE - colocated_tables regclass[]; -BEGIN - SELECT get_colocated_table_array(relation) INTO colocated_tables; - - PERFORM - master_update_shard_statistics(shardid) - FROM - pg_dist_shard - WHERE - logicalrelid = ANY (colocated_tables); -END; -$$ LANGUAGE 'plpgsql'; + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_update_table_statistics$$; COMMENT ON FUNCTION pg_catalog.citus_update_table_statistics(regclass) - IS 'updates shard statistics of the given table and its colocated tables'; + IS 'updates shard statistics of the given table'; diff --git a/src/backend/distributed/sql/udfs/citus_update_table_statistics/latest.sql b/src/backend/distributed/sql/udfs/citus_update_table_statistics/latest.sql index 13f4a680e..d4e07d24d 100644 --- a/src/backend/distributed/sql/udfs/citus_update_table_statistics/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_update_table_statistics/latest.sql @@ -1,17 +1,6 @@ CREATE FUNCTION pg_catalog.citus_update_table_statistics(relation regclass) -RETURNS VOID AS $$ -DECLARE - colocated_tables regclass[]; -BEGIN - SELECT get_colocated_table_array(relation) INTO colocated_tables; - - PERFORM - master_update_shard_statistics(shardid) - FROM - pg_dist_shard - WHERE - logicalrelid = ANY (colocated_tables); -END; -$$ LANGUAGE 'plpgsql'; + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_update_table_statistics$$; COMMENT ON FUNCTION pg_catalog.citus_update_table_statistics(regclass) - IS 'updates shard statistics of the given table and its colocated tables'; + IS 'updates shard statistics of the given table'; diff --git a/src/backend/distributed/utils/foreign_key_relationship.c b/src/backend/distributed/utils/foreign_key_relationship.c index 8ef9aaf06..465846f81 100644 --- a/src/backend/distributed/utils/foreign_key_relationship.c +++ b/src/backend/distributed/utils/foreign_key_relationship.c @@ -100,9 +100,6 @@ static ForeignConstraintRelationshipNode * CreateOrFindNode(HTAB *adjacencyLists relid); static List * GetConnectedListHelper(ForeignConstraintRelationshipNode *node, bool isReferencing); -static HTAB * CreateOidVisitedHashSet(void); -static bool OidVisited(HTAB *oidVisitedMap, Oid oid); -static void VisitOid(HTAB *oidVisitedMap, Oid oid); static List * GetForeignConstraintRelationshipHelper(Oid relationId, bool isReferencing); @@ -442,7 +439,7 @@ GetConnectedListHelper(ForeignConstraintRelationshipNode *node, bool isReferenci * As hash_create allocates memory in heap, callers are responsible to call * hash_destroy when appropriate. */ -static HTAB * +HTAB * CreateOidVisitedHashSet(void) { HASHCTL info = { 0 }; @@ -464,7 +461,7 @@ CreateOidVisitedHashSet(void) /* * OidVisited returns true if given oid is visited according to given oid hash-set. */ -static bool +bool OidVisited(HTAB *oidVisitedMap, Oid oid) { bool found = false; @@ -476,7 +473,7 @@ OidVisited(HTAB *oidVisitedMap, Oid oid) /* * VisitOid sets given oid as visited in given hash-set. */ -static void +void VisitOid(HTAB *oidVisitedMap, Oid oid) { bool found = false; diff --git a/src/include/distributed/foreign_key_relationship.h b/src/include/distributed/foreign_key_relationship.h index a14fc26e3..3aa040d76 100644 --- a/src/include/distributed/foreign_key_relationship.h +++ b/src/include/distributed/foreign_key_relationship.h @@ -22,5 +22,8 @@ extern List * ReferencingRelationIdList(Oid relationId); extern void SetForeignConstraintRelationshipGraphInvalid(void); extern bool IsForeignConstraintRelationshipGraphValid(void); extern void ClearForeignConstraintRelationshipGraphContext(void); +extern HTAB * CreateOidVisitedHashSet(void); +extern bool OidVisited(HTAB *oidVisitedMap, Oid oid); +extern void VisitOid(HTAB *oidVisitedMap, Oid oid); #endif diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index dfb5aeb0f..4b453966f 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -36,6 +36,7 @@ #define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)" #define SHARD_SIZES_COLUMN_COUNT 2 +#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4 /* In-memory representation of a typed tuple in pg_dist_shard. */ typedef struct ShardInterval @@ -206,7 +207,6 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); - /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, text *shardMinValue, text *shardMaxValue); @@ -264,5 +264,8 @@ extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray, int32 intervalTypeMod); extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, Oid *intervalTypeId, int32 *intervalTypeMod); +extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool + useDistributedTransaction, bool + useShardMinMaxQuery); #endif /* METADATA_UTILITY_H */ diff --git a/src/test/regress/expected/citus_update_table_statistics.out b/src/test/regress/expected/citus_update_table_statistics.out new file mode 100644 index 000000000..617296c21 --- /dev/null +++ b/src/test/regress/expected/citus_update_table_statistics.out @@ -0,0 +1,190 @@ +-- +-- citus_update_table_statistics.sql +-- +-- Test citus_update_table_statistics function on both +-- hash and append distributed tables +-- This function updates shardlength, shardminvalue and shardmaxvalue +-- +SET citus.next_shard_id TO 981000; +SET citus.next_placement_id TO 982000; +SET citus.shard_count TO 8; +SET citus.shard_replication_factor TO 2; +-- test with a hash-distributed table +-- here we update only shardlength, not shardminvalue and shardmaxvalue +CREATE TABLE test_table_statistics_hash (id int); +SELECT create_distributed_table('test_table_statistics_hash', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- populate table +INSERT INTO test_table_statistics_hash SELECT i FROM generate_series(0, 10000)i; +-- originally shardlength (size of the shard) is zero +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue AS shardminvalue, + ds.shardmaxvalue AS shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength = 0 +ORDER BY 2, 3; + tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue +--------------------------------------------------------------------- + test_table_statistics_hash | 981000 | 982000 | test_table_statistics_hash_981000 | -2147483648 | -1610612737 + test_table_statistics_hash | 981000 | 982001 | test_table_statistics_hash_981000 | -2147483648 | -1610612737 + test_table_statistics_hash | 981001 | 982002 | test_table_statistics_hash_981001 | -1610612736 | -1073741825 + test_table_statistics_hash | 981001 | 982003 | test_table_statistics_hash_981001 | -1610612736 | -1073741825 + test_table_statistics_hash | 981002 | 982004 | test_table_statistics_hash_981002 | -1073741824 | -536870913 + test_table_statistics_hash | 981002 | 982005 | test_table_statistics_hash_981002 | -1073741824 | -536870913 + test_table_statistics_hash | 981003 | 982006 | test_table_statistics_hash_981003 | -536870912 | -1 + test_table_statistics_hash | 981003 | 982007 | test_table_statistics_hash_981003 | -536870912 | -1 + test_table_statistics_hash | 981004 | 982008 | test_table_statistics_hash_981004 | 0 | 536870911 + test_table_statistics_hash | 981004 | 982009 | test_table_statistics_hash_981004 | 0 | 536870911 + test_table_statistics_hash | 981005 | 982010 | test_table_statistics_hash_981005 | 536870912 | 1073741823 + test_table_statistics_hash | 981005 | 982011 | test_table_statistics_hash_981005 | 536870912 | 1073741823 + test_table_statistics_hash | 981006 | 982012 | test_table_statistics_hash_981006 | 1073741824 | 1610612735 + test_table_statistics_hash | 981006 | 982013 | test_table_statistics_hash_981006 | 1073741824 | 1610612735 + test_table_statistics_hash | 981007 | 982014 | test_table_statistics_hash_981007 | 1610612736 | 2147483647 + test_table_statistics_hash | 981007 | 982015 | test_table_statistics_hash_981007 | 1610612736 | 2147483647 +(16 rows) + +-- setting this to on in order to verify that we use a distributed transaction id +-- to run the size queries from different connections +-- this is going to help detect deadlocks +SET citus.log_remote_commands TO ON; +-- setting this to sequential in order to have a deterministic order +-- in the output of citus.log_remote_commands +SET citus.multi_shard_modify_mode TO sequential; +-- update table statistics and then check that shardlength has changed +-- but shardminvalue and shardmaxvalue stay the same because this is +-- a hash distributed table +SELECT citus_update_table_statistics('test_table_statistics_hash'); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + citus_update_table_statistics +--------------------------------------------------------------------- + +(1 row) + +RESET citus.log_remote_commands; +RESET citus.multi_shard_modify_mode; +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue as shardminvalue, + ds.shardmaxvalue as shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength > 0 +ORDER BY 2, 3; + tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue +--------------------------------------------------------------------- + test_table_statistics_hash | 981000 | 982000 | test_table_statistics_hash_981000 | -2147483648 | -1610612737 + test_table_statistics_hash | 981000 | 982001 | test_table_statistics_hash_981000 | -2147483648 | -1610612737 + test_table_statistics_hash | 981001 | 982002 | test_table_statistics_hash_981001 | -1610612736 | -1073741825 + test_table_statistics_hash | 981001 | 982003 | test_table_statistics_hash_981001 | -1610612736 | -1073741825 + test_table_statistics_hash | 981002 | 982004 | test_table_statistics_hash_981002 | -1073741824 | -536870913 + test_table_statistics_hash | 981002 | 982005 | test_table_statistics_hash_981002 | -1073741824 | -536870913 + test_table_statistics_hash | 981003 | 982006 | test_table_statistics_hash_981003 | -536870912 | -1 + test_table_statistics_hash | 981003 | 982007 | test_table_statistics_hash_981003 | -536870912 | -1 + test_table_statistics_hash | 981004 | 982008 | test_table_statistics_hash_981004 | 0 | 536870911 + test_table_statistics_hash | 981004 | 982009 | test_table_statistics_hash_981004 | 0 | 536870911 + test_table_statistics_hash | 981005 | 982010 | test_table_statistics_hash_981005 | 536870912 | 1073741823 + test_table_statistics_hash | 981005 | 982011 | test_table_statistics_hash_981005 | 536870912 | 1073741823 + test_table_statistics_hash | 981006 | 982012 | test_table_statistics_hash_981006 | 1073741824 | 1610612735 + test_table_statistics_hash | 981006 | 982013 | test_table_statistics_hash_981006 | 1073741824 | 1610612735 + test_table_statistics_hash | 981007 | 982014 | test_table_statistics_hash_981007 | 1610612736 | 2147483647 + test_table_statistics_hash | 981007 | 982015 | test_table_statistics_hash_981007 | 1610612736 | 2147483647 +(16 rows) + +-- check with an append-distributed table +-- here we update shardlength, shardminvalue and shardmaxvalue +CREATE TABLE test_table_statistics_append (id int); +SELECT create_distributed_table('test_table_statistics_append', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COPY test_table_statistics_append FROM PROGRAM 'echo 0 && echo 1 && echo 2 && echo 3' WITH CSV; +COPY test_table_statistics_append FROM PROGRAM 'echo 4 && echo 5 && echo 6 && echo 7' WITH CSV; +-- originally shardminvalue and shardmaxvalue will be 0,3 and 4, 7 +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue as shardminvalue, + ds.shardmaxvalue as shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append') +ORDER BY 2, 3; + tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue +--------------------------------------------------------------------- + test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 0 | 3 + test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 0 | 3 + test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 4 | 7 + test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 4 | 7 +(4 rows) + +-- delete some data to change shardminvalues of a shards +DELETE FROM test_table_statistics_append WHERE id = 0 OR id = 4; +SET citus.log_remote_commands TO ON; +SET citus.multi_shard_modify_mode TO sequential; +-- update table statistics and then check that shardminvalue has changed +-- shardlength (shardsize) is still 8192 since there is very few data +SELECT citus_update_table_statistics('test_table_statistics_append'); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + citus_update_table_statistics +--------------------------------------------------------------------- + +(1 row) + +RESET citus.log_remote_commands; +RESET citus.multi_shard_modify_mode; +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue as shardminvalue, + ds.shardmaxvalue as shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append') +ORDER BY 2, 3; + tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue +--------------------------------------------------------------------- + test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 1 | 3 + test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 1 | 3 + test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 5 | 7 + test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 5 | 7 +(4 rows) + +DROP TABLE test_table_statistics_hash, test_table_statistics_append; +ALTER SYSTEM RESET citus.shard_count; +ALTER SYSTEM RESET citus.shard_replication_factor; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index d967b4a5e..7fc0453f5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -97,6 +97,11 @@ test: tableam test: propagate_statistics test: pg13_propagate_statistics +# ---------- +# Test for updating table statistics +# ---------- +test: citus_update_table_statistics + # ---------- # Miscellaneous tests to check our query planning behavior # ---------- diff --git a/src/test/regress/sql/citus_update_table_statistics.sql b/src/test/regress/sql/citus_update_table_statistics.sql new file mode 100644 index 000000000..95515d4a3 --- /dev/null +++ b/src/test/regress/sql/citus_update_table_statistics.sql @@ -0,0 +1,107 @@ +-- +-- citus_update_table_statistics.sql +-- +-- Test citus_update_table_statistics function on both +-- hash and append distributed tables +-- This function updates shardlength, shardminvalue and shardmaxvalue +-- +SET citus.next_shard_id TO 981000; +SET citus.next_placement_id TO 982000; +SET citus.shard_count TO 8; +SET citus.shard_replication_factor TO 2; + +-- test with a hash-distributed table +-- here we update only shardlength, not shardminvalue and shardmaxvalue +CREATE TABLE test_table_statistics_hash (id int); +SELECT create_distributed_table('test_table_statistics_hash', 'id'); + +-- populate table +INSERT INTO test_table_statistics_hash SELECT i FROM generate_series(0, 10000)i; + +-- originally shardlength (size of the shard) is zero +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue AS shardminvalue, + ds.shardmaxvalue AS shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength = 0 +ORDER BY 2, 3; + +-- setting this to on in order to verify that we use a distributed transaction id +-- to run the size queries from different connections +-- this is going to help detect deadlocks +SET citus.log_remote_commands TO ON; + +-- setting this to sequential in order to have a deterministic order +-- in the output of citus.log_remote_commands +SET citus.multi_shard_modify_mode TO sequential; + +-- update table statistics and then check that shardlength has changed +-- but shardminvalue and shardmaxvalue stay the same because this is +-- a hash distributed table + +SELECT citus_update_table_statistics('test_table_statistics_hash'); + +RESET citus.log_remote_commands; +RESET citus.multi_shard_modify_mode; + +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue as shardminvalue, + ds.shardmaxvalue as shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength > 0 +ORDER BY 2, 3; + +-- check with an append-distributed table +-- here we update shardlength, shardminvalue and shardmaxvalue +CREATE TABLE test_table_statistics_append (id int); +SELECT create_distributed_table('test_table_statistics_append', 'id', 'append'); +COPY test_table_statistics_append FROM PROGRAM 'echo 0 && echo 1 && echo 2 && echo 3' WITH CSV; +COPY test_table_statistics_append FROM PROGRAM 'echo 4 && echo 5 && echo 6 && echo 7' WITH CSV; + +-- originally shardminvalue and shardmaxvalue will be 0,3 and 4, 7 +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue as shardminvalue, + ds.shardmaxvalue as shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append') +ORDER BY 2, 3; + +-- delete some data to change shardminvalues of a shards +DELETE FROM test_table_statistics_append WHERE id = 0 OR id = 4; + +SET citus.log_remote_commands TO ON; +SET citus.multi_shard_modify_mode TO sequential; + +-- update table statistics and then check that shardminvalue has changed +-- shardlength (shardsize) is still 8192 since there is very few data +SELECT citus_update_table_statistics('test_table_statistics_append'); + +RESET citus.log_remote_commands; +RESET citus.multi_shard_modify_mode; + +SELECT + ds.logicalrelid::regclass::text AS tablename, + ds.shardid AS shardid, + dsp.placementid AS placementid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue as shardminvalue, + ds.shardmaxvalue as shardmaxvalue +FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) +WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append') +ORDER BY 2, 3; + +DROP TABLE test_table_statistics_hash, test_table_statistics_append; +ALTER SYSTEM RESET citus.shard_count; +ALTER SYSTEM RESET citus.shard_replication_factor;