mirror of https://github.com/citusdata/citus.git
Reimplement citus_update_table_statistics to detect dist. deadlocks (#4752)
* Reimplement citus_update_table_statistics
* Update stats for the given table not colocation group
* Add tests for reimplemented citus_update_table_statistics
* Use coordinated transaction, merge with citus_shard_sizes functions
* Update the old master_update_table_statistics as well
(cherry picked from commit 2f30614fe3
)
pull/5009/head
parent
28f1c2129d
commit
445291d94b
|
@ -79,14 +79,24 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
char *sizeQuery, bool failOnError,
|
char *sizeQuery, bool failOnError,
|
||||||
uint64 *tableSize);
|
uint64 *tableSize);
|
||||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||||
static char * GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList);
|
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
||||||
static char * GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode);
|
useShardMinMaxQuery);
|
||||||
static List * GenerateShardSizesQueryList(List *workerNodeList);
|
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
|
||||||
|
List *citusTableIds, bool
|
||||||
|
useShardMinMaxQuery);
|
||||||
|
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds,
|
||||||
|
bool useShardMinMaxQuery);
|
||||||
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
||||||
static List * OpenConnectionToNodes(List *workerNodeList);
|
static List * OpenConnectionToNodes(List *workerNodeList);
|
||||||
static void ReceiveShardNameAndSizeResults(List *connectionList,
|
static void ReceiveShardNameAndSizeResults(List *connectionList,
|
||||||
Tuplestorestate *tupleStore,
|
Tuplestorestate *tupleStore,
|
||||||
TupleDesc tupleDescriptor);
|
TupleDesc tupleDescriptor);
|
||||||
|
static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
|
||||||
|
ShardInterval *
|
||||||
|
shardInterval, char *shardName,
|
||||||
|
char *quotedShardName);
|
||||||
|
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
|
||||||
|
char *quotedShardName);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(citus_table_size);
|
PG_FUNCTION_INFO_V1(citus_table_size);
|
||||||
|
@ -102,25 +112,16 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
List *allCitusTableIds = AllCitusTableIds();
|
||||||
|
|
||||||
List *shardSizesQueryList = GenerateShardSizesQueryList(workerNodeList);
|
/* we don't need a distributed transaction here */
|
||||||
|
bool useDistributedTransaction = false;
|
||||||
|
|
||||||
List *connectionList = OpenConnectionToNodes(workerNodeList);
|
/* we only want the shard sizes here so useShardMinMaxQuery parameter is false */
|
||||||
FinishConnectionListEstablishment(connectionList);
|
bool useShardMinMaxQuery = false;
|
||||||
|
List *connectionList = SendShardStatisticsQueriesInParallel(allCitusTableIds,
|
||||||
|
useDistributedTransaction,
|
||||||
/* send commands in parallel */
|
useShardMinMaxQuery);
|
||||||
for (int i = 0; i < list_length(connectionList); i++)
|
|
||||||
{
|
|
||||||
MultiConnection *connection = (MultiConnection *) list_nth(connectionList, i);
|
|
||||||
char *shardSizesQuery = (char *) list_nth(shardSizesQueryList, i);
|
|
||||||
int querySent = SendRemoteCommand(connection, shardSizesQuery);
|
|
||||||
if (querySent == 0)
|
|
||||||
{
|
|
||||||
ReportConnectionError(connection, WARNING);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||||
|
@ -225,6 +226,59 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SendShardStatisticsQueriesInParallel generates query lists for obtaining shard
|
||||||
|
* statistics and then sends the commands in parallel by opening connections
|
||||||
|
* to available nodes. It returns the connection list.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction,
|
||||||
|
bool
|
||||||
|
useShardMinMaxQuery)
|
||||||
|
{
|
||||||
|
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
||||||
|
|
||||||
|
List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList,
|
||||||
|
citusTableIds,
|
||||||
|
useShardMinMaxQuery);
|
||||||
|
|
||||||
|
List *connectionList = OpenConnectionToNodes(workerNodeList);
|
||||||
|
FinishConnectionListEstablishment(connectionList);
|
||||||
|
|
||||||
|
if (useDistributedTransaction)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For now, in the case we want to include shard min and max values, we also
|
||||||
|
* want to update the entries in pg_dist_placement and pg_dist_shard with the
|
||||||
|
* latest statistics. In order to detect distributed deadlocks, we assign a
|
||||||
|
* distributed transaction ID to the current transaction
|
||||||
|
*/
|
||||||
|
UseCoordinatedTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* send commands in parallel */
|
||||||
|
for (int i = 0; i < list_length(connectionList); i++)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) list_nth(connectionList, i);
|
||||||
|
char *shardSizesQuery = (char *) list_nth(shardSizesQueryList, i);
|
||||||
|
|
||||||
|
if (useDistributedTransaction)
|
||||||
|
{
|
||||||
|
/* run the size query in a distributed transaction */
|
||||||
|
RemoteTransactionBeginIfNecessary(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
int querySent = SendRemoteCommand(connection, shardSizesQuery);
|
||||||
|
|
||||||
|
if (querySent == 0)
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return connectionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* OpenConnectionToNodes opens a single connection per node
|
* OpenConnectionToNodes opens a single connection per node
|
||||||
* for the given workerNodeList.
|
* for the given workerNodeList.
|
||||||
|
@ -250,20 +304,25 @@ OpenConnectionToNodes(List *workerNodeList)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateShardSizesQueryList generates a query per node that
|
* GenerateShardStatisticsQueryList generates a query per node that will return:
|
||||||
* will return all shard_name, shard_size pairs from the node.
|
* - all shard_name, shard_size pairs from the node (if includeShardMinMax is false)
|
||||||
|
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples from the node (if true)
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
GenerateShardSizesQueryList(List *workerNodeList)
|
GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, bool
|
||||||
|
useShardMinMaxQuery)
|
||||||
{
|
{
|
||||||
List *shardSizesQueryList = NIL;
|
List *shardStatisticsQueryList = NIL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
{
|
{
|
||||||
char *shardSizesQuery = GenerateAllShardNameAndSizeQueryForNode(workerNode);
|
char *shardStatisticsQuery = GenerateAllShardStatisticsQueryForNode(workerNode,
|
||||||
shardSizesQueryList = lappend(shardSizesQueryList, shardSizesQuery);
|
citusTableIds,
|
||||||
|
useShardMinMaxQuery);
|
||||||
|
shardStatisticsQueryList = lappend(shardStatisticsQueryList,
|
||||||
|
shardStatisticsQuery);
|
||||||
}
|
}
|
||||||
return shardSizesQueryList;
|
return shardStatisticsQueryList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -572,37 +631,50 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateAllShardNameAndSizeQueryForNode generates a query that returns all
|
* GenerateAllShardStatisticsQueryForNode generates a query that returns:
|
||||||
* shard_name, shard_size pairs for the given node.
|
* - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false)
|
||||||
|
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples (if true)
|
||||||
*/
|
*/
|
||||||
static char *
|
static char *
|
||||||
GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode)
|
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool
|
||||||
|
useShardMinMaxQuery)
|
||||||
{
|
{
|
||||||
List *allCitusTableIds = AllCitusTableIds();
|
StringInfo allShardStatisticsQuery = makeStringInfo();
|
||||||
|
|
||||||
StringInfo allShardNameAndSizeQuery = makeStringInfo();
|
|
||||||
|
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, allCitusTableIds)
|
foreach_oid(relationId, citusTableIds)
|
||||||
{
|
{
|
||||||
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
|
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
|
||||||
char *shardNameAndSizeQuery =
|
char *shardStatisticsQuery =
|
||||||
GenerateShardNameAndSizeQueryForShardList(shardIntervalsOnNode);
|
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode,
|
||||||
appendStringInfoString(allShardNameAndSizeQuery, shardNameAndSizeQuery);
|
useShardMinMaxQuery);
|
||||||
|
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a dummy entry so that UNION ALL doesn't complain */
|
/* Add a dummy entry so that UNION ALL doesn't complain */
|
||||||
appendStringInfo(allShardNameAndSizeQuery, "SELECT NULL::text, 0::bigint;");
|
if (useShardMinMaxQuery)
|
||||||
return allShardNameAndSizeQuery->data;
|
{
|
||||||
|
/* 0 for shard_id, NULL for min, NULL for text, 0 for shard_size */
|
||||||
|
appendStringInfo(allShardStatisticsQuery,
|
||||||
|
"SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* NULL for shard_name, 0 for shard_size */
|
||||||
|
appendStringInfo(allShardStatisticsQuery, "SELECT NULL::text, 0::bigint;");
|
||||||
|
}
|
||||||
|
return allShardStatisticsQuery->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateShardNameAndSizeQueryForShardList generates a SELECT shard_name - shard_size query to get
|
* GenerateShardStatisticsQueryForShardList generates one of the two types of queries:
|
||||||
* size of multiple tables.
|
* - SELECT shard_name - shard_size (if useShardMinMaxQuery is false)
|
||||||
|
* - SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size (if true)
|
||||||
*/
|
*/
|
||||||
static char *
|
static char *
|
||||||
GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList)
|
GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
||||||
|
useShardMinMaxQuery)
|
||||||
{
|
{
|
||||||
StringInfo selectQuery = makeStringInfo();
|
StringInfo selectQuery = makeStringInfo();
|
||||||
|
|
||||||
|
@ -618,8 +690,15 @@ GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList)
|
||||||
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
||||||
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
||||||
|
|
||||||
appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName);
|
if (useShardMinMaxQuery)
|
||||||
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
|
{
|
||||||
|
AppendShardSizeMinMaxQuery(selectQuery, shardId, shardInterval, shardName,
|
||||||
|
quotedShardName);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
AppendShardSizeQuery(selectQuery, shardInterval, quotedShardName);
|
||||||
|
}
|
||||||
appendStringInfo(selectQuery, " UNION ALL ");
|
appendStringInfo(selectQuery, " UNION ALL ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -627,6 +706,54 @@ GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AppendShardSizeMinMaxQuery appends a query in the following form to selectQuery
|
||||||
|
* SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
|
||||||
|
ShardInterval *shardInterval, char *shardName,
|
||||||
|
char *quotedShardName)
|
||||||
|
{
|
||||||
|
if (IsCitusTableType(shardInterval->relationId, APPEND_DISTRIBUTED))
|
||||||
|
{
|
||||||
|
/* fill in the partition column name */
|
||||||
|
const uint32 unusedTableId = 1;
|
||||||
|
Var *partitionColumn = PartitionColumn(shardInterval->relationId,
|
||||||
|
unusedTableId);
|
||||||
|
char *partitionColumnName = get_attname(shardInterval->relationId,
|
||||||
|
partitionColumn->varattno, false);
|
||||||
|
appendStringInfo(selectQuery,
|
||||||
|
"SELECT " UINT64_FORMAT
|
||||||
|
" AS shard_id, min(%s)::text AS shard_minvalue, max(%s)::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size FROM %s ",
|
||||||
|
shardId, partitionColumnName,
|
||||||
|
partitionColumnName,
|
||||||
|
quotedShardName, shardName);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* we don't need to update min/max for non-append distributed tables because they don't change */
|
||||||
|
appendStringInfo(selectQuery,
|
||||||
|
"SELECT " UINT64_FORMAT
|
||||||
|
" AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size ",
|
||||||
|
shardId, quotedShardName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AppendShardSizeQuery appends a query in the following form to selectQuery
|
||||||
|
* SELECT shard_name, shard_size
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
|
||||||
|
char *quotedShardName)
|
||||||
|
{
|
||||||
|
appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName);
|
||||||
|
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
|
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
|
||||||
* its' size with internal functions.
|
* its' size with internal functions.
|
||||||
|
|
|
@ -32,7 +32,9 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
|
#include "distributed/foreign_key_relationship.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/lock_graph.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/metadata_utility.h"
|
#include "distributed/metadata_utility.h"
|
||||||
|
@ -65,12 +67,22 @@ static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
|
||||||
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
|
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
|
||||||
const char *shardName, uint64 *shardSize,
|
const char *shardName, uint64 *shardSize,
|
||||||
text **shardMinValue, text **shardMaxValue);
|
text **shardMinValue, text **shardMaxValue);
|
||||||
|
static void UpdateTableStatistics(Oid relationId);
|
||||||
|
static void ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList);
|
||||||
|
static void UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid
|
||||||
|
relationId, List *shardPlacementList, uint64
|
||||||
|
shardSize, text *shardMinValue,
|
||||||
|
text *shardMaxValue);
|
||||||
|
static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||||
|
text **shardMinValue, text **shardMaxValue,
|
||||||
|
uint64 *shardSize);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_create_empty_shard);
|
PG_FUNCTION_INFO_V1(master_create_empty_shard);
|
||||||
PG_FUNCTION_INFO_V1(master_append_table_to_shard);
|
PG_FUNCTION_INFO_V1(master_append_table_to_shard);
|
||||||
PG_FUNCTION_INFO_V1(citus_update_shard_statistics);
|
PG_FUNCTION_INFO_V1(citus_update_shard_statistics);
|
||||||
PG_FUNCTION_INFO_V1(master_update_shard_statistics);
|
PG_FUNCTION_INFO_V1(master_update_shard_statistics);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_update_table_statistics);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -361,6 +373,23 @@ citus_update_shard_statistics(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_update_table_statistics updates metadata (shard size and shard min/max
|
||||||
|
* values) of the shards of the given table
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_update_table_statistics(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Oid distributedTableId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
UpdateTableStatistics(distributedTableId);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_update_shard_statistics is a wrapper function for old UDF name.
|
* master_update_shard_statistics is a wrapper function for old UDF name.
|
||||||
*/
|
*/
|
||||||
|
@ -782,7 +811,6 @@ UpdateShardStatistics(int64 shardId)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
Oid relationId = shardInterval->relationId;
|
Oid relationId = shardInterval->relationId;
|
||||||
char storageType = shardInterval->storageType;
|
|
||||||
bool statsOK = false;
|
bool statsOK = false;
|
||||||
uint64 shardSize = 0;
|
uint64 shardSize = 0;
|
||||||
text *minValue = NULL;
|
text *minValue = NULL;
|
||||||
|
@ -825,17 +853,166 @@ UpdateShardStatistics(int64 shardId)
|
||||||
errdetail("Setting shard statistics to NULL")));
|
errdetail("Setting shard statistics to NULL")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* make sure we don't process cancel signals */
|
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, shardPlacementList,
|
||||||
HOLD_INTERRUPTS();
|
shardSize, minValue, maxValue);
|
||||||
|
return shardSize;
|
||||||
|
}
|
||||||
|
|
||||||
/* update metadata for each shard placement we appended to */
|
|
||||||
|
/*
|
||||||
|
* UpdateTableStatistics updates metadata (shard size and shard min/max values)
|
||||||
|
* of the shards of the given table. Follows a similar logic to citus_shard_sizes function.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UpdateTableStatistics(Oid relationId)
|
||||||
|
{
|
||||||
|
List *citusTableIds = NIL;
|
||||||
|
citusTableIds = lappend_oid(citusTableIds, relationId);
|
||||||
|
|
||||||
|
/* we want to use a distributed transaction here to detect distributed deadlocks */
|
||||||
|
bool useDistributedTransaction = true;
|
||||||
|
|
||||||
|
/* we also want shard min/max values for append distributed tables */
|
||||||
|
bool useShardMinMaxQuery = true;
|
||||||
|
|
||||||
|
List *connectionList = SendShardStatisticsQueriesInParallel(citusTableIds,
|
||||||
|
useDistributedTransaction,
|
||||||
|
useShardMinMaxQuery);
|
||||||
|
|
||||||
|
ReceiveAndUpdateShardsSizeAndMinMax(connectionList);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReceiveAndUpdateShardsSizeAndMinMax receives shard id, size
|
||||||
|
* and min max results from the given connection list, and updates
|
||||||
|
* respective entries in pg_dist_placement and pg_dist_shard
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* From the connection list, we will not get all the shards, but
|
||||||
|
* all the placements. We use a hash table to remember already visited shard ids
|
||||||
|
* since we update all the different placements of a shard id at once.
|
||||||
|
*/
|
||||||
|
HTAB *alreadyVisitedShardPlacements = CreateOidVisitedHashSet();
|
||||||
|
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
foreach_ptr(connection, connectionList)
|
||||||
|
{
|
||||||
|
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
|
if (!IsResponseOK(result))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, WARNING);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64 rowCount = PQntuples(result);
|
||||||
|
int64 colCount = PQnfields(result);
|
||||||
|
|
||||||
|
/* Although it is not expected */
|
||||||
|
if (colCount != UPDATE_SHARD_STATISTICS_COLUMN_COUNT)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||||
|
"citus_update_table_statistics")));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||||
|
{
|
||||||
|
uint64 shardId = 0;
|
||||||
|
text *shardMinValue = NULL;
|
||||||
|
text *shardMaxValue = NULL;
|
||||||
|
uint64 shardSize = 0;
|
||||||
|
|
||||||
|
if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardMinValue,
|
||||||
|
&shardMaxValue, &shardSize))
|
||||||
|
{
|
||||||
|
/* this row has no valid shard statistics */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OidVisited(alreadyVisitedShardPlacements, shardId))
|
||||||
|
{
|
||||||
|
/* We have already updated this placement list */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
VisitOid(alreadyVisitedShardPlacements, shardId);
|
||||||
|
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
Oid relationId = shardInterval->relationId;
|
||||||
|
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||||
|
|
||||||
|
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId,
|
||||||
|
shardPlacementList, shardSize, shardMinValue,
|
||||||
|
shardMaxValue);
|
||||||
|
}
|
||||||
|
PQclear(result);
|
||||||
|
ForgetResults(connection);
|
||||||
|
}
|
||||||
|
hash_destroy(alreadyVisitedShardPlacements);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult
|
||||||
|
* - it returns true if this row belongs to a valid shard
|
||||||
|
* - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID)
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||||
|
text **shardMinValue, text **shardMaxValue, uint64 *shardSize)
|
||||||
|
{
|
||||||
|
*shardId = ParseIntField(result, rowIndex, 0);
|
||||||
|
|
||||||
|
/* check for the dummy entries we put so that UNION ALL wouldn't complain */
|
||||||
|
if (*shardId == INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
/* this row has no valid shard statistics */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *minValueResult = PQgetvalue(result, rowIndex, 1);
|
||||||
|
char *maxValueResult = PQgetvalue(result, rowIndex, 2);
|
||||||
|
*shardMinValue = cstring_to_text(minValueResult);
|
||||||
|
*shardMaxValue = cstring_to_text(maxValueResult);
|
||||||
|
*shardSize = ParseIntField(result, rowIndex, 3);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateShardSizeAndMinMax updates the shardlength (shard size) of the given
|
||||||
|
* shard and its placements in pg_dist_placement, and updates the shard min value
|
||||||
|
* and shard max value of the given shard in pg_dist_shard if the relationId belongs
|
||||||
|
* to an append-distributed table
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relationId,
|
||||||
|
List *shardPlacementList, uint64 shardSize, text *shardMinValue,
|
||||||
|
text *shardMaxValue)
|
||||||
|
{
|
||||||
|
char storageType = shardInterval->storageType;
|
||||||
|
|
||||||
|
ShardPlacement *placement = NULL;
|
||||||
|
|
||||||
|
/* update metadata for each shard placement */
|
||||||
foreach_ptr(placement, shardPlacementList)
|
foreach_ptr(placement, shardPlacementList)
|
||||||
{
|
{
|
||||||
uint64 placementId = placement->placementId;
|
uint64 placementId = placement->placementId;
|
||||||
int32 groupId = placement->groupId;
|
int32 groupId = placement->groupId;
|
||||||
|
|
||||||
DeleteShardPlacementRow(placementId);
|
DeleteShardPlacementRow(placementId);
|
||||||
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, shardSize,
|
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE,
|
||||||
|
shardSize,
|
||||||
groupId);
|
groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -843,18 +1020,9 @@ UpdateShardStatistics(int64 shardId)
|
||||||
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
DeleteShardRow(shardId);
|
DeleteShardRow(shardId);
|
||||||
InsertShardRow(relationId, shardId, storageType, minValue, maxValue);
|
InsertShardRow(relationId, shardId, storageType, shardMinValue,
|
||||||
|
shardMaxValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QueryCancelPending)
|
|
||||||
{
|
|
||||||
ereport(WARNING, (errmsg("cancel requests are ignored during metadata update")));
|
|
||||||
QueryCancelPending = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
RESUME_INTERRUPTS();
|
|
||||||
|
|
||||||
return shardSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1394,22 +1394,11 @@ COMMENT ON FUNCTION master_update_node(node_id int, new_node_name text, new_node
|
||||||
|
|
||||||
-- shard statistics
|
-- shard statistics
|
||||||
CREATE OR REPLACE FUNCTION master_update_table_statistics(relation regclass)
|
CREATE OR REPLACE FUNCTION master_update_table_statistics(relation regclass)
|
||||||
RETURNS VOID AS $$
|
RETURNS VOID
|
||||||
DECLARE
|
LANGUAGE C STRICT
|
||||||
colocated_tables regclass[];
|
AS 'MODULE_PATHNAME', $$citus_update_table_statistics$$;
|
||||||
BEGIN
|
COMMENT ON FUNCTION pg_catalog.master_update_table_statistics(regclass)
|
||||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;
|
IS 'updates shard statistics of the given table';
|
||||||
|
|
||||||
PERFORM
|
|
||||||
master_update_shard_statistics(shardid)
|
|
||||||
FROM
|
|
||||||
pg_dist_shard
|
|
||||||
WHERE
|
|
||||||
logicalrelid = ANY (colocated_tables);
|
|
||||||
END;
|
|
||||||
$$ LANGUAGE 'plpgsql';
|
|
||||||
COMMENT ON FUNCTION master_update_table_statistics(regclass)
|
|
||||||
IS 'updates shard statistics of the given table and its colocated tables';
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION get_colocated_shard_array(bigint)
|
CREATE OR REPLACE FUNCTION get_colocated_shard_array(bigint)
|
||||||
RETURNS BIGINT[]
|
RETURNS BIGINT[]
|
||||||
|
|
|
@ -1,17 +1,6 @@
|
||||||
CREATE FUNCTION pg_catalog.citus_update_table_statistics(relation regclass)
|
CREATE FUNCTION pg_catalog.citus_update_table_statistics(relation regclass)
|
||||||
RETURNS VOID AS $$
|
RETURNS VOID
|
||||||
DECLARE
|
LANGUAGE C STRICT
|
||||||
colocated_tables regclass[];
|
AS 'MODULE_PATHNAME', $$citus_update_table_statistics$$;
|
||||||
BEGIN
|
|
||||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;
|
|
||||||
|
|
||||||
PERFORM
|
|
||||||
master_update_shard_statistics(shardid)
|
|
||||||
FROM
|
|
||||||
pg_dist_shard
|
|
||||||
WHERE
|
|
||||||
logicalrelid = ANY (colocated_tables);
|
|
||||||
END;
|
|
||||||
$$ LANGUAGE 'plpgsql';
|
|
||||||
COMMENT ON FUNCTION pg_catalog.citus_update_table_statistics(regclass)
|
COMMENT ON FUNCTION pg_catalog.citus_update_table_statistics(regclass)
|
||||||
IS 'updates shard statistics of the given table and its colocated tables';
|
IS 'updates shard statistics of the given table';
|
||||||
|
|
|
@ -1,17 +1,6 @@
|
||||||
CREATE FUNCTION pg_catalog.citus_update_table_statistics(relation regclass)
|
CREATE FUNCTION pg_catalog.citus_update_table_statistics(relation regclass)
|
||||||
RETURNS VOID AS $$
|
RETURNS VOID
|
||||||
DECLARE
|
LANGUAGE C STRICT
|
||||||
colocated_tables regclass[];
|
AS 'MODULE_PATHNAME', $$citus_update_table_statistics$$;
|
||||||
BEGIN
|
|
||||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;
|
|
||||||
|
|
||||||
PERFORM
|
|
||||||
master_update_shard_statistics(shardid)
|
|
||||||
FROM
|
|
||||||
pg_dist_shard
|
|
||||||
WHERE
|
|
||||||
logicalrelid = ANY (colocated_tables);
|
|
||||||
END;
|
|
||||||
$$ LANGUAGE 'plpgsql';
|
|
||||||
COMMENT ON FUNCTION pg_catalog.citus_update_table_statistics(regclass)
|
COMMENT ON FUNCTION pg_catalog.citus_update_table_statistics(regclass)
|
||||||
IS 'updates shard statistics of the given table and its colocated tables';
|
IS 'updates shard statistics of the given table';
|
||||||
|
|
|
@ -100,9 +100,6 @@ static ForeignConstraintRelationshipNode * CreateOrFindNode(HTAB *adjacencyLists
|
||||||
relid);
|
relid);
|
||||||
static List * GetConnectedListHelper(ForeignConstraintRelationshipNode *node,
|
static List * GetConnectedListHelper(ForeignConstraintRelationshipNode *node,
|
||||||
bool isReferencing);
|
bool isReferencing);
|
||||||
static HTAB * CreateOidVisitedHashSet(void);
|
|
||||||
static bool OidVisited(HTAB *oidVisitedMap, Oid oid);
|
|
||||||
static void VisitOid(HTAB *oidVisitedMap, Oid oid);
|
|
||||||
static List * GetForeignConstraintRelationshipHelper(Oid relationId, bool isReferencing);
|
static List * GetForeignConstraintRelationshipHelper(Oid relationId, bool isReferencing);
|
||||||
|
|
||||||
|
|
||||||
|
@ -442,7 +439,7 @@ GetConnectedListHelper(ForeignConstraintRelationshipNode *node, bool isReferenci
|
||||||
* As hash_create allocates memory in heap, callers are responsible to call
|
* As hash_create allocates memory in heap, callers are responsible to call
|
||||||
* hash_destroy when appropriate.
|
* hash_destroy when appropriate.
|
||||||
*/
|
*/
|
||||||
static HTAB *
|
HTAB *
|
||||||
CreateOidVisitedHashSet(void)
|
CreateOidVisitedHashSet(void)
|
||||||
{
|
{
|
||||||
HASHCTL info = { 0 };
|
HASHCTL info = { 0 };
|
||||||
|
@ -464,7 +461,7 @@ CreateOidVisitedHashSet(void)
|
||||||
/*
|
/*
|
||||||
* OidVisited returns true if given oid is visited according to given oid hash-set.
|
* OidVisited returns true if given oid is visited according to given oid hash-set.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
OidVisited(HTAB *oidVisitedMap, Oid oid)
|
OidVisited(HTAB *oidVisitedMap, Oid oid)
|
||||||
{
|
{
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
@ -476,7 +473,7 @@ OidVisited(HTAB *oidVisitedMap, Oid oid)
|
||||||
/*
|
/*
|
||||||
* VisitOid sets given oid as visited in given hash-set.
|
* VisitOid sets given oid as visited in given hash-set.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
VisitOid(HTAB *oidVisitedMap, Oid oid)
|
VisitOid(HTAB *oidVisitedMap, Oid oid)
|
||||||
{
|
{
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
|
|
@ -22,5 +22,8 @@ extern List * ReferencingRelationIdList(Oid relationId);
|
||||||
extern void SetForeignConstraintRelationshipGraphInvalid(void);
|
extern void SetForeignConstraintRelationshipGraphInvalid(void);
|
||||||
extern bool IsForeignConstraintRelationshipGraphValid(void);
|
extern bool IsForeignConstraintRelationshipGraphValid(void);
|
||||||
extern void ClearForeignConstraintRelationshipGraphContext(void);
|
extern void ClearForeignConstraintRelationshipGraphContext(void);
|
||||||
|
extern HTAB * CreateOidVisitedHashSet(void);
|
||||||
|
extern bool OidVisited(HTAB *oidVisitedMap, Oid oid);
|
||||||
|
extern void VisitOid(HTAB *oidVisitedMap, Oid oid);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)"
|
#define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)"
|
||||||
|
|
||||||
#define SHARD_SIZES_COLUMN_COUNT 2
|
#define SHARD_SIZES_COLUMN_COUNT 2
|
||||||
|
#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4
|
||||||
|
|
||||||
/* In-memory representation of a typed tuple in pg_dist_shard. */
|
/* In-memory representation of a typed tuple in pg_dist_shard. */
|
||||||
typedef struct ShardInterval
|
typedef struct ShardInterval
|
||||||
|
@ -206,7 +207,6 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||||
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
||||||
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
|
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations to modify shard and shard placement data */
|
/* Function declarations to modify shard and shard placement data */
|
||||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||||
text *shardMinValue, text *shardMaxValue);
|
text *shardMinValue, text *shardMaxValue);
|
||||||
|
@ -264,5 +264,8 @@ extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray,
|
||||||
int32 intervalTypeMod);
|
int32 intervalTypeMod);
|
||||||
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
|
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
|
||||||
Oid *intervalTypeId, int32 *intervalTypeMod);
|
Oid *intervalTypeId, int32 *intervalTypeMod);
|
||||||
|
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool
|
||||||
|
useDistributedTransaction, bool
|
||||||
|
useShardMinMaxQuery);
|
||||||
|
|
||||||
#endif /* METADATA_UTILITY_H */
|
#endif /* METADATA_UTILITY_H */
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
--
|
||||||
|
-- citus_update_table_statistics.sql
|
||||||
|
--
|
||||||
|
-- Test citus_update_table_statistics function on both
|
||||||
|
-- hash and append distributed tables
|
||||||
|
-- This function updates shardlength, shardminvalue and shardmaxvalue
|
||||||
|
--
|
||||||
|
SET citus.next_shard_id TO 981000;
|
||||||
|
SET citus.next_placement_id TO 982000;
|
||||||
|
SET citus.shard_count TO 8;
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
-- test with a hash-distributed table
|
||||||
|
-- here we update only shardlength, not shardminvalue and shardmaxvalue
|
||||||
|
CREATE TABLE test_table_statistics_hash (id int);
|
||||||
|
SELECT create_distributed_table('test_table_statistics_hash', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- populate table
|
||||||
|
INSERT INTO test_table_statistics_hash SELECT i FROM generate_series(0, 10000)i;
|
||||||
|
-- originally shardlength (size of the shard) is zero
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue AS shardminvalue,
|
||||||
|
ds.shardmaxvalue AS shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength = 0
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_table_statistics_hash | 981000 | 982000 | test_table_statistics_hash_981000 | -2147483648 | -1610612737
|
||||||
|
test_table_statistics_hash | 981000 | 982001 | test_table_statistics_hash_981000 | -2147483648 | -1610612737
|
||||||
|
test_table_statistics_hash | 981001 | 982002 | test_table_statistics_hash_981001 | -1610612736 | -1073741825
|
||||||
|
test_table_statistics_hash | 981001 | 982003 | test_table_statistics_hash_981001 | -1610612736 | -1073741825
|
||||||
|
test_table_statistics_hash | 981002 | 982004 | test_table_statistics_hash_981002 | -1073741824 | -536870913
|
||||||
|
test_table_statistics_hash | 981002 | 982005 | test_table_statistics_hash_981002 | -1073741824 | -536870913
|
||||||
|
test_table_statistics_hash | 981003 | 982006 | test_table_statistics_hash_981003 | -536870912 | -1
|
||||||
|
test_table_statistics_hash | 981003 | 982007 | test_table_statistics_hash_981003 | -536870912 | -1
|
||||||
|
test_table_statistics_hash | 981004 | 982008 | test_table_statistics_hash_981004 | 0 | 536870911
|
||||||
|
test_table_statistics_hash | 981004 | 982009 | test_table_statistics_hash_981004 | 0 | 536870911
|
||||||
|
test_table_statistics_hash | 981005 | 982010 | test_table_statistics_hash_981005 | 536870912 | 1073741823
|
||||||
|
test_table_statistics_hash | 981005 | 982011 | test_table_statistics_hash_981005 | 536870912 | 1073741823
|
||||||
|
test_table_statistics_hash | 981006 | 982012 | test_table_statistics_hash_981006 | 1073741824 | 1610612735
|
||||||
|
test_table_statistics_hash | 981006 | 982013 | test_table_statistics_hash_981006 | 1073741824 | 1610612735
|
||||||
|
test_table_statistics_hash | 981007 | 982014 | test_table_statistics_hash_981007 | 1610612736 | 2147483647
|
||||||
|
test_table_statistics_hash | 981007 | 982015 | test_table_statistics_hash_981007 | 1610612736 | 2147483647
|
||||||
|
(16 rows)
|
||||||
|
|
||||||
|
-- setting this to on in order to verify that we use a distributed transaction id
|
||||||
|
-- to run the size queries from different connections
|
||||||
|
-- this is going to help detect deadlocks
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
-- setting this to sequential in order to have a deterministic order
|
||||||
|
-- in the output of citus.log_remote_commands
|
||||||
|
SET citus.multi_shard_modify_mode TO sequential;
|
||||||
|
-- update table statistics and then check that shardlength has changed
|
||||||
|
-- but shardminvalue and shardmaxvalue stay the same because this is
|
||||||
|
-- a hash distributed table
|
||||||
|
SELECT citus_update_table_statistics('test_table_statistics_hash');
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing COMMIT
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing COMMIT
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
citus_update_table_statistics
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
RESET citus.multi_shard_modify_mode;
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue as shardminvalue,
|
||||||
|
ds.shardmaxvalue as shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength > 0
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_table_statistics_hash | 981000 | 982000 | test_table_statistics_hash_981000 | -2147483648 | -1610612737
|
||||||
|
test_table_statistics_hash | 981000 | 982001 | test_table_statistics_hash_981000 | -2147483648 | -1610612737
|
||||||
|
test_table_statistics_hash | 981001 | 982002 | test_table_statistics_hash_981001 | -1610612736 | -1073741825
|
||||||
|
test_table_statistics_hash | 981001 | 982003 | test_table_statistics_hash_981001 | -1610612736 | -1073741825
|
||||||
|
test_table_statistics_hash | 981002 | 982004 | test_table_statistics_hash_981002 | -1073741824 | -536870913
|
||||||
|
test_table_statistics_hash | 981002 | 982005 | test_table_statistics_hash_981002 | -1073741824 | -536870913
|
||||||
|
test_table_statistics_hash | 981003 | 982006 | test_table_statistics_hash_981003 | -536870912 | -1
|
||||||
|
test_table_statistics_hash | 981003 | 982007 | test_table_statistics_hash_981003 | -536870912 | -1
|
||||||
|
test_table_statistics_hash | 981004 | 982008 | test_table_statistics_hash_981004 | 0 | 536870911
|
||||||
|
test_table_statistics_hash | 981004 | 982009 | test_table_statistics_hash_981004 | 0 | 536870911
|
||||||
|
test_table_statistics_hash | 981005 | 982010 | test_table_statistics_hash_981005 | 536870912 | 1073741823
|
||||||
|
test_table_statistics_hash | 981005 | 982011 | test_table_statistics_hash_981005 | 536870912 | 1073741823
|
||||||
|
test_table_statistics_hash | 981006 | 982012 | test_table_statistics_hash_981006 | 1073741824 | 1610612735
|
||||||
|
test_table_statistics_hash | 981006 | 982013 | test_table_statistics_hash_981006 | 1073741824 | 1610612735
|
||||||
|
test_table_statistics_hash | 981007 | 982014 | test_table_statistics_hash_981007 | 1610612736 | 2147483647
|
||||||
|
test_table_statistics_hash | 981007 | 982015 | test_table_statistics_hash_981007 | 1610612736 | 2147483647
|
||||||
|
(16 rows)
|
||||||
|
|
||||||
|
-- check with an append-distributed table
|
||||||
|
-- here we update shardlength, shardminvalue and shardmaxvalue
|
||||||
|
CREATE TABLE test_table_statistics_append (id int);
|
||||||
|
SELECT create_distributed_table('test_table_statistics_append', 'id', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY test_table_statistics_append FROM PROGRAM 'echo 0 && echo 1 && echo 2 && echo 3' WITH CSV;
|
||||||
|
COPY test_table_statistics_append FROM PROGRAM 'echo 4 && echo 5 && echo 6 && echo 7' WITH CSV;
|
||||||
|
-- originally shardminvalue and shardmaxvalue will be 0,3 and 4, 7
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue as shardminvalue,
|
||||||
|
ds.shardmaxvalue as shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 0 | 3
|
||||||
|
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 0 | 3
|
||||||
|
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 4 | 7
|
||||||
|
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 4 | 7
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- delete some data to change shardminvalues of a shards
|
||||||
|
DELETE FROM test_table_statistics_append WHERE id = 0 OR id = 4;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.multi_shard_modify_mode TO sequential;
|
||||||
|
-- update table statistics and then check that shardminvalue has changed
|
||||||
|
-- shardlength (shardsize) is still 8192 since there is very few data
|
||||||
|
SELECT citus_update_table_statistics('test_table_statistics_append');
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing COMMIT
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing COMMIT
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
citus_update_table_statistics
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
RESET citus.multi_shard_modify_mode;
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue as shardminvalue,
|
||||||
|
ds.shardmaxvalue as shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 1 | 3
|
||||||
|
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 1 | 3
|
||||||
|
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 5 | 7
|
||||||
|
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 5 | 7
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
DROP TABLE test_table_statistics_hash, test_table_statistics_append;
|
||||||
|
ALTER SYSTEM RESET citus.shard_count;
|
||||||
|
ALTER SYSTEM RESET citus.shard_replication_factor;
|
|
@ -97,6 +97,11 @@ test: tableam
|
||||||
test: propagate_statistics
|
test: propagate_statistics
|
||||||
test: pg13_propagate_statistics
|
test: pg13_propagate_statistics
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Test for updating table statistics
|
||||||
|
# ----------
|
||||||
|
test: citus_update_table_statistics
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
# ----------
|
# ----------
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
--
|
||||||
|
-- citus_update_table_statistics.sql
|
||||||
|
--
|
||||||
|
-- Test citus_update_table_statistics function on both
|
||||||
|
-- hash and append distributed tables
|
||||||
|
-- This function updates shardlength, shardminvalue and shardmaxvalue
|
||||||
|
--
|
||||||
|
SET citus.next_shard_id TO 981000;
|
||||||
|
SET citus.next_placement_id TO 982000;
|
||||||
|
SET citus.shard_count TO 8;
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
|
||||||
|
-- test with a hash-distributed table
|
||||||
|
-- here we update only shardlength, not shardminvalue and shardmaxvalue
|
||||||
|
CREATE TABLE test_table_statistics_hash (id int);
|
||||||
|
SELECT create_distributed_table('test_table_statistics_hash', 'id');
|
||||||
|
|
||||||
|
-- populate table
|
||||||
|
INSERT INTO test_table_statistics_hash SELECT i FROM generate_series(0, 10000)i;
|
||||||
|
|
||||||
|
-- originally shardlength (size of the shard) is zero
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue AS shardminvalue,
|
||||||
|
ds.shardmaxvalue AS shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength = 0
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
|
||||||
|
-- setting this to on in order to verify that we use a distributed transaction id
|
||||||
|
-- to run the size queries from different connections
|
||||||
|
-- this is going to help detect deadlocks
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
|
||||||
|
-- setting this to sequential in order to have a deterministic order
|
||||||
|
-- in the output of citus.log_remote_commands
|
||||||
|
SET citus.multi_shard_modify_mode TO sequential;
|
||||||
|
|
||||||
|
-- update table statistics and then check that shardlength has changed
|
||||||
|
-- but shardminvalue and shardmaxvalue stay the same because this is
|
||||||
|
-- a hash distributed table
|
||||||
|
|
||||||
|
SELECT citus_update_table_statistics('test_table_statistics_hash');
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
RESET citus.multi_shard_modify_mode;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue as shardminvalue,
|
||||||
|
ds.shardmaxvalue as shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_hash') AND dsp.shardlength > 0
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
|
||||||
|
-- check with an append-distributed table
|
||||||
|
-- here we update shardlength, shardminvalue and shardmaxvalue
|
||||||
|
CREATE TABLE test_table_statistics_append (id int);
|
||||||
|
SELECT create_distributed_table('test_table_statistics_append', 'id', 'append');
|
||||||
|
COPY test_table_statistics_append FROM PROGRAM 'echo 0 && echo 1 && echo 2 && echo 3' WITH CSV;
|
||||||
|
COPY test_table_statistics_append FROM PROGRAM 'echo 4 && echo 5 && echo 6 && echo 7' WITH CSV;
|
||||||
|
|
||||||
|
-- originally shardminvalue and shardmaxvalue will be 0,3 and 4, 7
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue as shardminvalue,
|
||||||
|
ds.shardmaxvalue as shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
|
||||||
|
-- delete some data to change shardminvalues of a shards
|
||||||
|
DELETE FROM test_table_statistics_append WHERE id = 0 OR id = 4;
|
||||||
|
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.multi_shard_modify_mode TO sequential;
|
||||||
|
|
||||||
|
-- update table statistics and then check that shardminvalue has changed
|
||||||
|
-- shardlength (shardsize) is still 8192 since there is very few data
|
||||||
|
SELECT citus_update_table_statistics('test_table_statistics_append');
|
||||||
|
|
||||||
|
RESET citus.log_remote_commands;
|
||||||
|
RESET citus.multi_shard_modify_mode;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
ds.logicalrelid::regclass::text AS tablename,
|
||||||
|
ds.shardid AS shardid,
|
||||||
|
dsp.placementid AS placementid,
|
||||||
|
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||||
|
ds.shardminvalue as shardminvalue,
|
||||||
|
ds.shardmaxvalue as shardmaxvalue
|
||||||
|
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||||
|
WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
|
||||||
|
ORDER BY 2, 3;
|
||||||
|
|
||||||
|
DROP TABLE test_table_statistics_hash, test_table_statistics_append;
|
||||||
|
ALTER SYSTEM RESET citus.shard_count;
|
||||||
|
ALTER SYSTEM RESET citus.shard_replication_factor;
|
Loading…
Reference in New Issue