Stop updating shard range in citus_update_shard_statistics

pull/5454/head
Marco Slot 2021-11-10 09:44:06 +01:00
parent 6ff65db7ee
commit 56eae48daf
6 changed files with 318 additions and 517 deletions

View File

@ -80,26 +80,18 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError, SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize); uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
useShardMinMaxQuery);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType); static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType); static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
List *citusTableIds, bool List *citusTableIds);
useShardMinMaxQuery); static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds);
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds,
bool useShardMinMaxQuery);
static void ErrorIfNotSuitableToGetSize(Oid relationId); static void ErrorIfNotSuitableToGetSize(Oid relationId);
static List * OpenConnectionToNodes(List *workerNodeList); static List * OpenConnectionToNodes(List *workerNodeList);
static void ReceiveShardNameAndSizeResults(List *connectionList, static void ReceiveShardNameAndSizeResults(List *connectionList,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor); TupleDesc tupleDescriptor);
static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId, static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
ShardInterval *
shardInterval, char *shardName,
char *quotedShardName);
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
char *quotedShardName);
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes); uint64 totalBytes);
@ -245,11 +237,8 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
/* we don't need a distributed transaction here */ /* we don't need a distributed transaction here */
bool useDistributedTransaction = false; bool useDistributedTransaction = false;
/* we only want the shard sizes here so useShardMinMaxQuery parameter is false */ List *connectionList =
bool useShardMinMaxQuery = false; SendShardStatisticsQueriesInParallel(allCitusTableIds, useDistributedTransaction);
List *connectionList = SendShardStatisticsQueriesInParallel(allCitusTableIds,
useDistributedTransaction,
useShardMinMaxQuery);
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
@ -342,15 +331,12 @@ citus_relation_size(PG_FUNCTION_ARGS)
* to available nodes. It returns the connection list. * to available nodes. It returns the connection list.
*/ */
List * List *
SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction, SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction)
bool
useShardMinMaxQuery)
{ {
List *workerNodeList = ActivePrimaryNodeList(NoLock); List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList, List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList,
citusTableIds, citusTableIds);
useShardMinMaxQuery);
List *connectionList = OpenConnectionToNodes(workerNodeList); List *connectionList = OpenConnectionToNodes(workerNodeList);
FinishConnectionListEstablishment(connectionList); FinishConnectionListEstablishment(connectionList);
@ -415,20 +401,18 @@ OpenConnectionToNodes(List *workerNodeList)
/* /*
* GenerateShardStatisticsQueryList generates a query per node that will return: * GenerateShardStatisticsQueryList generates a query per node that will return:
* - all shard_name, shard_size pairs from the node (if includeShardMinMax is false) * shard_id, shard_name, shard_size for all shard placements on the node
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples from the node (if true)
*/ */
static List * static List *
GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, bool GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds)
useShardMinMaxQuery)
{ {
List *shardStatisticsQueryList = NIL; List *shardStatisticsQueryList = NIL;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
{ {
char *shardStatisticsQuery = GenerateAllShardStatisticsQueryForNode(workerNode, char *shardStatisticsQuery =
citusTableIds, GenerateAllShardStatisticsQueryForNode(workerNode, citusTableIds);
useShardMinMaxQuery);
shardStatisticsQueryList = lappend(shardStatisticsQueryList, shardStatisticsQueryList = lappend(shardStatisticsQueryList,
shardStatisticsQuery); shardStatisticsQuery);
} }
@ -479,12 +463,13 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
char *tableName = PQgetvalue(result, rowIndex, 0); /* format is [0] shard id, [1] shard name, [2] size */
char *tableName = PQgetvalue(result, rowIndex, 1);
Datum resultStringDatum = CStringGetDatum(tableName); Datum resultStringDatum = CStringGetDatum(tableName);
Datum textDatum = DirectFunctionCall1(textin, resultStringDatum); Datum textDatum = DirectFunctionCall1(textin, resultStringDatum);
values[0] = textDatum; values[0] = textDatum;
values[1] = ParseIntField(result, rowIndex, 1); values[1] = ParseIntField(result, rowIndex, 2);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
@ -858,12 +843,10 @@ GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType)
/* /*
* GenerateAllShardStatisticsQueryForNode generates a query that returns: * GenerateAllShardStatisticsQueryForNode generates a query that returns:
* - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false) * shard_id, shard_name, shard_size for all shard placements on the node
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples (if true)
*/ */
static char * static char *
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
useShardMinMaxQuery)
{ {
StringInfo allShardStatisticsQuery = makeStringInfo(); StringInfo allShardStatisticsQuery = makeStringInfo();
@ -881,61 +864,32 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
relationId); relationId);
char *shardStatisticsQuery = char *shardStatisticsQuery =
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode, GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
useShardMinMaxQuery);
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery); appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
relation_close(relation, AccessShareLock); relation_close(relation, AccessShareLock);
} }
} }
/* Add a dummy entry so that UNION ALL doesn't complain */ /* Add a dummy entry so that UNION ALL doesn't complain */
if (useShardMinMaxQuery) appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, NULL::text, 0::bigint;");
{
/* 0 for shard_id, NULL for min, NULL for text, 0 for shard_size */
appendStringInfo(allShardStatisticsQuery,
"SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;");
}
else
{
/* NULL for shard_name, 0 for shard_size */
appendStringInfo(allShardStatisticsQuery, "SELECT NULL::text, 0::bigint;");
}
return allShardStatisticsQuery->data; return allShardStatisticsQuery->data;
} }
/* /*
* GenerateShardStatisticsQueryForShardList generates one of the two types of queries: * GenerateShardStatisticsQueryForShardList generates a query that returns:
* - SELECT shard_name - shard_size (if useShardMinMaxQuery is false) * SELECT shard_id, shard_name, shard_size for all shards in the list
* - SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size (if true)
*/ */
static char * static char *
GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
useShardMinMaxQuery)
{ {
StringInfo selectQuery = makeStringInfo(); StringInfo selectQuery = makeStringInfo();
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
uint64 shardId = shardInterval->shardId; AppendShardSizeQuery(selectQuery, shardInterval);
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId);
AppendShardIdToName(&shardName, shardId);
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
if (useShardMinMaxQuery)
{
AppendShardSizeMinMaxQuery(selectQuery, shardId, shardInterval, shardName,
quotedShardName);
}
else
{
AppendShardSizeQuery(selectQuery, shardInterval, quotedShardName);
}
appendStringInfo(selectQuery, " UNION ALL "); appendStringInfo(selectQuery, " UNION ALL ");
} }
@ -943,50 +897,25 @@ GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
} }
/*
* AppendShardSizeMinMaxQuery appends a query in the following form to selectQuery
* SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size
*/
static void
AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
ShardInterval *shardInterval, char *shardName,
char *quotedShardName)
{
if (IsCitusTableType(shardInterval->relationId, APPEND_DISTRIBUTED))
{
/* fill in the partition column name */
const uint32 unusedTableId = 1;
Var *partitionColumn = PartitionColumn(shardInterval->relationId,
unusedTableId);
char *partitionColumnName = get_attname(shardInterval->relationId,
partitionColumn->varattno, false);
appendStringInfo(selectQuery,
"SELECT " UINT64_FORMAT
" AS shard_id, min(%s)::text AS shard_minvalue, max(%s)::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size FROM %s ",
shardId, partitionColumnName,
partitionColumnName,
quotedShardName, shardName);
}
else
{
/* we don't need to update min/max for non-append distributed tables because they don't change */
appendStringInfo(selectQuery,
"SELECT " UINT64_FORMAT
" AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size ",
shardId, quotedShardName);
}
}
/* /*
* AppendShardSizeQuery appends a query in the following form to selectQuery * AppendShardSizeQuery appends a query in the following form to selectQuery
* SELECT shard_name, shard_size * SELECT shard_id, shard_name, shard_size
*/ */
static void static void
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval, AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
char *quotedShardName)
{ {
appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName); uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId);
AppendShardIdToName(&shardName, shardId);
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName);
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName); appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
} }

View File

@ -65,16 +65,13 @@
/* Local functions forward declarations */ /* Local functions forward declarations */
static List * RelationShardListForShardCreate(ShardInterval *shardInterval); static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
const char *shardName, uint64 *shardSize, const char *shardName, uint64 *shardSize);
text **shardMinValue, text **shardMaxValue);
static void UpdateTableStatistics(Oid relationId); static void UpdateTableStatistics(Oid relationId);
static void ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList); static void ReceiveAndUpdateShardsSizes(List *connectionList);
static void UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid static void UpdateShardSize(uint64 shardId, ShardInterval *shardInterval,
relationId, List *shardPlacementList, uint64 Oid relationId, List *shardPlacementList,
shardSize, text *shardMinValue, uint64 shardSize);
text *shardMaxValue);
static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
text **shardMinValue, text **shardMaxValue,
uint64 *shardSize); uint64 *shardSize);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -707,8 +704,6 @@ UpdateShardStatistics(int64 shardId)
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
bool statsOK = false; bool statsOK = false;
uint64 shardSize = 0; uint64 shardSize = 0;
text *minValue = NULL;
text *maxValue = NULL;
/* Build shard qualified name. */ /* Build shard qualified name. */
char *shardName = get_rel_name(relationId); char *shardName = get_rel_name(relationId);
@ -726,7 +721,7 @@ UpdateShardStatistics(int64 shardId)
foreach_ptr(placement, shardPlacementList) foreach_ptr(placement, shardPlacementList)
{ {
statsOK = WorkerShardStats(placement, relationId, shardQualifiedName, statsOK = WorkerShardStats(placement, relationId, shardQualifiedName,
&shardSize, &minValue, &maxValue); &shardSize);
if (statsOK) if (statsOK)
{ {
break; break;
@ -747,8 +742,9 @@ UpdateShardStatistics(int64 shardId)
errdetail("Setting shard statistics to NULL"))); errdetail("Setting shard statistics to NULL")));
} }
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, shardPlacementList, UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
shardSize, minValue, maxValue); shardSize);
return shardSize; return shardSize;
} }
@ -766,24 +762,20 @@ UpdateTableStatistics(Oid relationId)
/* we want to use a distributed transaction here to detect distributed deadlocks */ /* we want to use a distributed transaction here to detect distributed deadlocks */
bool useDistributedTransaction = true; bool useDistributedTransaction = true;
/* we also want shard min/max values for append distributed tables */ List *connectionList =
bool useShardMinMaxQuery = true; SendShardStatisticsQueriesInParallel(citusTableIds, useDistributedTransaction);
List *connectionList = SendShardStatisticsQueriesInParallel(citusTableIds, ReceiveAndUpdateShardsSizes(connectionList);
useDistributedTransaction,
useShardMinMaxQuery);
ReceiveAndUpdateShardsSizeAndMinMax(connectionList);
} }
/* /*
* ReceiveAndUpdateShardsSizeAndMinMax receives shard id, size * ReceiveAndUpdateShardsSizes receives shard id and size
* and min max results from the given connection list, and updates * results from the given connection list, and updates
* respective entries in pg_dist_placement and pg_dist_shard * respective entries in pg_dist_placement.
*/ */
static void static void
ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) ReceiveAndUpdateShardsSizes(List *connectionList)
{ {
/* /*
* From the connection list, we will not get all the shards, but * From the connection list, we will not get all the shards, but
@ -812,7 +804,7 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
int64 colCount = PQnfields(result); int64 colCount = PQnfields(result);
/* Although it is not expected */ /* Although it is not expected */
if (colCount != UPDATE_SHARD_STATISTICS_COLUMN_COUNT) if (colCount != SHARD_SIZES_COLUMN_COUNT)
{ {
ereport(WARNING, (errmsg("unexpected number of columns from " ereport(WARNING, (errmsg("unexpected number of columns from "
"citus_update_table_statistics"))); "citus_update_table_statistics")));
@ -822,12 +814,9 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
uint64 shardId = 0; uint64 shardId = 0;
text *shardMinValue = NULL;
text *shardMaxValue = NULL;
uint64 shardSize = 0; uint64 shardSize = 0;
if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardMinValue, if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardSize))
&shardMaxValue, &shardSize))
{ {
/* this row has no valid shard statistics */ /* this row has no valid shard statistics */
continue; continue;
@ -845,9 +834,8 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
List *shardPlacementList = ActiveShardPlacementList(shardId); List *shardPlacementList = ActiveShardPlacementList(shardId);
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
shardPlacementList, shardSize, shardMinValue, shardSize);
shardMaxValue);
} }
PQclear(result); PQclear(result);
ForgetResults(connection); ForgetResults(connection);
@ -860,10 +848,13 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
* ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult * ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult
* - it returns true if this row belongs to a valid shard * - it returns true if this row belongs to a valid shard
* - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID) * - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID)
*
* Input tuples are assumed to be of the form:
* (shard_id bigint, shard_name text, shard_size bigint)
*/ */
static bool static bool
ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
text **shardMinValue, text **shardMaxValue, uint64 *shardSize) uint64 *shardSize)
{ {
*shardId = ParseIntField(result, rowIndex, 0); *shardId = ParseIntField(result, rowIndex, 0);
@ -874,28 +865,19 @@ ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
return false; return false;
} }
char *minValueResult = PQgetvalue(result, rowIndex, 1); *shardSize = ParseIntField(result, rowIndex, 2);
char *maxValueResult = PQgetvalue(result, rowIndex, 2);
*shardMinValue = cstring_to_text(minValueResult);
*shardMaxValue = cstring_to_text(maxValueResult);
*shardSize = ParseIntField(result, rowIndex, 3);
return true; return true;
} }
/* /*
* UpdateShardSizeAndMinMax updates the shardlength (shard size) of the given * UpdateShardSize updates the shardlength (shard size) of the given
* shard and its placements in pg_dist_placement, and updates the shard min value * shard and its placements in pg_dist_placement.
* and shard max value of the given shard in pg_dist_shard if the relationId belongs
* to an append-distributed table
*/ */
static void static void
UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relationId, UpdateShardSize(uint64 shardId, ShardInterval *shardInterval, Oid relationId,
List *shardPlacementList, uint64 shardSize, text *shardMinValue, List *shardPlacementList, uint64 shardSize)
text *shardMaxValue)
{ {
char storageType = shardInterval->storageType;
ShardPlacement *placement = NULL; ShardPlacement *placement = NULL;
/* update metadata for each shard placement */ /* update metadata for each shard placement */
@ -906,16 +888,7 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat
DeleteShardPlacementRow(placementId); DeleteShardPlacementRow(placementId);
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE,
shardSize, shardSize, groupId);
groupId);
}
/* only update shard min/max values for append-partitioned tables */
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
{
DeleteShardRow(shardId);
InsertShardRow(relationId, shardId, storageType, shardMinValue,
shardMaxValue);
} }
} }
@ -926,17 +899,10 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat
*/ */
static bool static bool
WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName, WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName,
uint64 *shardSize, text **shardMinValue, text **shardMaxValue) uint64 *shardSize)
{ {
StringInfo tableSizeQuery = makeStringInfo(); StringInfo tableSizeQuery = makeStringInfo();
const uint32 unusedTableId = 1;
StringInfo partitionValueQuery = makeStringInfo();
PGresult *queryResult = NULL; PGresult *queryResult = NULL;
const int minValueIndex = 0;
const int maxValueIndex = 1;
char *tableSizeStringEnd = NULL; char *tableSizeStringEnd = NULL;
int connectionFlags = 0; int connectionFlags = 0;
@ -951,8 +917,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
Assert(connection != NULL); Assert(connection != NULL);
*shardSize = 0; *shardSize = 0;
*shardMinValue = NULL;
*shardMaxValue = NULL;
char *quotedShardName = quote_literal_cstr(shardName); char *quotedShardName = quote_literal_cstr(shardName);
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName); appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
@ -986,40 +950,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
PQclear(queryResult); PQclear(queryResult);
ForgetResults(connection); ForgetResults(connection);
if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED))
{
/* we don't need min/max for non-append distributed tables */
return true;
}
/* fill in the partition column name and shard name in the query. */
Var *partitionColumn = PartitionColumn(relationId, unusedTableId);
char *partitionColumnName = get_attname(relationId, partitionColumn->varattno, false);
appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY,
partitionColumnName, partitionColumnName, shardName);
executeCommand = ExecuteOptionalRemoteCommand(connection, partitionValueQuery->data,
&queryResult);
if (executeCommand != 0)
{
return false;
}
bool minValueIsNull = PQgetisnull(queryResult, 0, minValueIndex);
bool maxValueIsNull = PQgetisnull(queryResult, 0, maxValueIndex);
if (!minValueIsNull && !maxValueIsNull)
{
char *minValueResult = PQgetvalue(queryResult, 0, minValueIndex);
char *maxValueResult = PQgetvalue(queryResult, 0, maxValueIndex);
*shardMinValue = cstring_to_text(minValueResult);
*shardMaxValue = cstring_to_text(maxValueResult);
}
PQclear(queryResult);
ForgetResults(connection);
return true; return true;
} }

View File

@ -39,8 +39,7 @@
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \ #define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
"worker_partitioned_relation_total_size(%s)" "worker_partitioned_relation_total_size(%s)"
#define SHARD_SIZES_COLUMN_COUNT 2 #define SHARD_SIZES_COLUMN_COUNT (3)
#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4
/* In-memory representation of a typed tuple in pg_dist_shard. */ /* In-memory representation of a typed tuple in pg_dist_shard. */
typedef struct ShardInterval typedef struct ShardInterval
@ -281,9 +280,8 @@ extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray,
int32 intervalTypeMod); int32 intervalTypeMod);
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
Oid *intervalTypeId, int32 *intervalTypeMod); Oid *intervalTypeId, int32 *intervalTypeMod);
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds,
useDistributedTransaction, bool bool useDistributedTransaction);
useShardMinMaxQuery);
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes, uint64 *availableBytes,
uint64 *totalBytes); uint64 *totalBytes);

View File

@ -64,11 +64,11 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_hash'); SELECT citus_update_table_statistics('test_table_statistics_hash');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -152,11 +152,11 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_append'); SELECT citus_update_table_statistics('test_table_statistics_append');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -181,10 +181,10 @@ WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
ORDER BY 2, 3; ORDER BY 2, 3;
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
--------------------------------------------------------------------- ---------------------------------------------------------------------
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 1 | 3 test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | |
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 1 | 3 test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | |
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 5 | 7 test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | |
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 5 | 7 test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | |
(4 rows) (4 rows)
DROP TABLE test_table_statistics_hash, test_table_statistics_append; DROP TABLE test_table_statistics_hash, test_table_statistics_append;

View File

@ -112,11 +112,8 @@ SELECT count(*) FROM customer_copy_hash;
-- Confirm that data was copied -- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash; SELECT count(*) FROM customer_copy_hash;
-- Make sure that master_update_shard_statistics() only updates shard length for -- Update shard statistics for hash-partitioned table
-- hash-partitioned tables SELECT citus_update_shard_statistics(560000);
SELECT master_update_shard_statistics(560000);
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000;
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
@ -206,24 +203,13 @@ FROM customer_copy_range WHERE c_custkey <= 500;
-- Check whether data was copied -- Check whether data was copied
SELECT count(*) FROM customer_copy_range; SELECT count(*) FROM customer_copy_range;
-- Manipulate min/max values and check shard statistics for new shard
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000
WHERE shardid = :new_shard_id;
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
-- Update shard statistics for range-partitioned shard and check that only the -- Update shard statistics for range-partitioned shard
-- shard length is updated. SELECT citus_update_shard_statistics(:new_shard_id);
SELECT master_update_shard_statistics(:new_shard_id);
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
-- Revert back min/max value updates
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
WHERE shardid = :new_shard_id;
-- Create a new append-partitioned table into which to COPY -- Create a new append-partitioned table into which to COPY
CREATE TABLE customer_copy_append ( CREATE TABLE customer_copy_append (
c_custkey integer, c_custkey integer,
@ -272,16 +258,13 @@ END;
SELECT * FROM customer_copy_append; SELECT * FROM customer_copy_append;
-- Manipulate manipulate and check shard statistics for append-partitioned table shard -- Manipulate manipulate and check shard statistics for append-partitioned table shard
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560132;
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132; UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132;
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
-- Update shard statistics for append-partitioned shard -- Update shard statistics for append-partitioned shard
SELECT master_update_shard_statistics(560132); SELECT citus_update_shard_statistics(560132);
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
-- Create lineitem table -- Create lineitem table

File diff suppressed because it is too large Load Diff