mirror of https://github.com/citusdata/citus.git
Merge branch 'master' into velioglu/make_object_lock_explicit
commit
6590f12de4
|
@ -74,7 +74,7 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
|
|||
* user-friendly, but this function is really only meant to be called
|
||||
* from the trigger.
|
||||
*/
|
||||
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
|
||||
if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation)
|
||||
{
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -134,14 +134,14 @@ MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName
|
|||
* user-friendly, but this function is really only meant to be called
|
||||
* from the trigger.
|
||||
*/
|
||||
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
|
||||
if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
if (!ShouldSyncTableMetadata(relationId))
|
||||
if (!ShouldSyncTableMetadataViaCatalog(relationId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -207,7 +207,6 @@ static ScanKeyData DistObjectScanKey[3];
|
|||
|
||||
|
||||
/* local function forward declarations */
|
||||
static bool IsCitusTableViaCatalog(Oid relationId);
|
||||
static HeapTuple PgDistPartitionTupleViaCatalog(Oid relationId);
|
||||
static ShardIdCacheEntry * LookupShardIdCacheEntry(int64 shardId);
|
||||
static CitusTableCacheEntry * BuildCitusTableCacheEntry(Oid relationId);
|
||||
|
@ -484,7 +483,7 @@ IsCitusTable(Oid relationId)
|
|||
* offset and the corresponding index. If we ever come close to changing
|
||||
* that, we'll have to work a bit harder.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
IsCitusTableViaCatalog(Oid relationId)
|
||||
{
|
||||
HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId);
|
||||
|
@ -538,6 +537,51 @@ PartitionMethodViaCatalog(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* PartitionColumnViaCatalog gets a relationId and returns the partition
|
||||
* key column from pg_dist_partition via reading from catalog.
|
||||
*/
|
||||
Var *
|
||||
PartitionColumnViaCatalog(Oid relationId)
|
||||
{
|
||||
HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId);
|
||||
if (!HeapTupleIsValid(partitionTuple))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
if (isNullArray[Anum_pg_dist_partition_partkey - 1])
|
||||
{
|
||||
/* partition key cannot be NULL, still let's make sure */
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Datum partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1];
|
||||
char *partitionKeyString = TextDatumGetCString(partitionKeyDatum);
|
||||
|
||||
/* convert the string to a Node and ensure it is a Var */
|
||||
Node *partitionNode = stringToNode(partitionKeyString);
|
||||
Assert(IsA(partitionNode, Var));
|
||||
|
||||
Var *partitionColumn = (Var *) partitionNode;
|
||||
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
return partitionColumn;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PgDistPartitionTupleViaCatalog is a helper function that searches
|
||||
* pg_dist_partition for the given relationId. The caller is responsible
|
||||
|
|
|
@ -88,6 +88,8 @@ static char * TruncateTriggerCreateCommand(Oid relationId);
|
|||
static char * SchemaOwnerName(Oid objectId);
|
||||
static bool HasMetadataWorkers(void);
|
||||
static List * DetachPartitionCommandList(void);
|
||||
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
|
||||
bool citusTableWithNoDistKey);
|
||||
static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
|
||||
static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
|
||||
static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
|
||||
|
@ -380,15 +382,51 @@ ShouldSyncTableMetadata(Oid relationId)
|
|||
|
||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
||||
|
||||
if (IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) ||
|
||||
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else
|
||||
bool hashDistributed = IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED);
|
||||
bool citusTableWithNoDistKey =
|
||||
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY);
|
||||
|
||||
return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a distributed table should
|
||||
* be propagated to metadata workers, i.e. the table is an MX table or reference table.
|
||||
* Tables with streaming replication model (which means RF=1) and hash distribution are
|
||||
* considered as MX tables while tables with none distribution are reference tables.
|
||||
*
|
||||
* ShouldSyncTableMetadataViaCatalog does not use the CitusTableCache and instead reads
|
||||
* from catalog tables directly.
|
||||
*/
|
||||
bool
|
||||
ShouldSyncTableMetadataViaCatalog(Oid relationId)
|
||||
{
|
||||
if (!OidIsValid(relationId) || !IsCitusTableViaCatalog(relationId))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
||||
bool hashDistributed = partitionMethod == DISTRIBUTE_BY_HASH;
|
||||
bool citusTableWithNoDistKey = partitionMethod == DISTRIBUTE_BY_NONE;
|
||||
|
||||
return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSyncTableMetadataInternal decides whether we should sync the metadata for a table
|
||||
* based on whether it is a hash distributed table, or a citus table with no distribution
|
||||
* key.
|
||||
*
|
||||
* This function is here to make sure that ShouldSyncTableMetadata and
|
||||
* ShouldSyncTableMetadataViaCatalog behaves the same way.
|
||||
*/
|
||||
static bool
|
||||
ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey)
|
||||
{
|
||||
return hashDistributed || citusTableWithNoDistKey;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -80,26 +80,18 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
|||
SizeQueryType sizeQueryType, bool failOnError,
|
||||
uint64 *tableSize);
|
||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
||||
useShardMinMaxQuery);
|
||||
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
|
||||
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
|
||||
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
|
||||
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
|
||||
List *citusTableIds, bool
|
||||
useShardMinMaxQuery);
|
||||
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds,
|
||||
bool useShardMinMaxQuery);
|
||||
List *citusTableIds);
|
||||
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds);
|
||||
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
||||
static List * OpenConnectionToNodes(List *workerNodeList);
|
||||
static void ReceiveShardNameAndSizeResults(List *connectionList,
|
||||
Tuplestorestate *tupleStore,
|
||||
TupleDesc tupleDescriptor);
|
||||
static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
|
||||
ShardInterval *
|
||||
shardInterval, char *shardName,
|
||||
char *quotedShardName);
|
||||
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
|
||||
char *quotedShardName);
|
||||
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
|
||||
|
||||
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
|
||||
uint64 totalBytes);
|
||||
|
@ -245,11 +237,8 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
|
|||
/* we don't need a distributed transaction here */
|
||||
bool useDistributedTransaction = false;
|
||||
|
||||
/* we only want the shard sizes here so useShardMinMaxQuery parameter is false */
|
||||
bool useShardMinMaxQuery = false;
|
||||
List *connectionList = SendShardStatisticsQueriesInParallel(allCitusTableIds,
|
||||
useDistributedTransaction,
|
||||
useShardMinMaxQuery);
|
||||
List *connectionList =
|
||||
SendShardStatisticsQueriesInParallel(allCitusTableIds, useDistributedTransaction);
|
||||
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
|
@ -342,15 +331,12 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
|||
* to available nodes. It returns the connection list.
|
||||
*/
|
||||
List *
|
||||
SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction,
|
||||
bool
|
||||
useShardMinMaxQuery)
|
||||
SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction)
|
||||
{
|
||||
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
||||
|
||||
List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList,
|
||||
citusTableIds,
|
||||
useShardMinMaxQuery);
|
||||
citusTableIds);
|
||||
|
||||
List *connectionList = OpenConnectionToNodes(workerNodeList);
|
||||
FinishConnectionListEstablishment(connectionList);
|
||||
|
@ -415,20 +401,18 @@ OpenConnectionToNodes(List *workerNodeList)
|
|||
|
||||
/*
|
||||
* GenerateShardStatisticsQueryList generates a query per node that will return:
|
||||
* - all shard_name, shard_size pairs from the node (if includeShardMinMax is false)
|
||||
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples from the node (if true)
|
||||
* shard_id, shard_name, shard_size for all shard placements on the node
|
||||
*/
|
||||
static List *
|
||||
GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, bool
|
||||
useShardMinMaxQuery)
|
||||
GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds)
|
||||
{
|
||||
List *shardStatisticsQueryList = NIL;
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
char *shardStatisticsQuery = GenerateAllShardStatisticsQueryForNode(workerNode,
|
||||
citusTableIds,
|
||||
useShardMinMaxQuery);
|
||||
char *shardStatisticsQuery =
|
||||
GenerateAllShardStatisticsQueryForNode(workerNode, citusTableIds);
|
||||
|
||||
shardStatisticsQueryList = lappend(shardStatisticsQueryList,
|
||||
shardStatisticsQuery);
|
||||
}
|
||||
|
@ -479,12 +463,13 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore
|
|||
memset(values, 0, sizeof(values));
|
||||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
char *tableName = PQgetvalue(result, rowIndex, 0);
|
||||
/* format is [0] shard id, [1] shard name, [2] size */
|
||||
char *tableName = PQgetvalue(result, rowIndex, 1);
|
||||
Datum resultStringDatum = CStringGetDatum(tableName);
|
||||
Datum textDatum = DirectFunctionCall1(textin, resultStringDatum);
|
||||
|
||||
values[0] = textDatum;
|
||||
values[1] = ParseIntField(result, rowIndex, 1);
|
||||
values[1] = ParseIntField(result, rowIndex, 2);
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||
}
|
||||
|
@ -858,12 +843,10 @@ GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType)
|
|||
|
||||
/*
|
||||
* GenerateAllShardStatisticsQueryForNode generates a query that returns:
|
||||
* - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false)
|
||||
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples (if true)
|
||||
* shard_id, shard_name, shard_size for all shard placements on the node
|
||||
*/
|
||||
static char *
|
||||
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool
|
||||
useShardMinMaxQuery)
|
||||
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
|
||||
{
|
||||
StringInfo allShardStatisticsQuery = makeStringInfo();
|
||||
|
||||
|
@ -881,61 +864,32 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
|
|||
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
|
||||
relationId);
|
||||
char *shardStatisticsQuery =
|
||||
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode,
|
||||
useShardMinMaxQuery);
|
||||
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
|
||||
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
|
||||
relation_close(relation, AccessShareLock);
|
||||
}
|
||||
}
|
||||
|
||||
/* Add a dummy entry so that UNION ALL doesn't complain */
|
||||
if (useShardMinMaxQuery)
|
||||
{
|
||||
/* 0 for shard_id, NULL for min, NULL for text, 0 for shard_size */
|
||||
appendStringInfo(allShardStatisticsQuery,
|
||||
"SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;");
|
||||
}
|
||||
else
|
||||
{
|
||||
/* NULL for shard_name, 0 for shard_size */
|
||||
appendStringInfo(allShardStatisticsQuery, "SELECT NULL::text, 0::bigint;");
|
||||
}
|
||||
appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, NULL::text, 0::bigint;");
|
||||
|
||||
return allShardStatisticsQuery->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateShardStatisticsQueryForShardList generates one of the two types of queries:
|
||||
* - SELECT shard_name - shard_size (if useShardMinMaxQuery is false)
|
||||
* - SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size (if true)
|
||||
* GenerateShardStatisticsQueryForShardList generates a query that returns:
|
||||
* SELECT shard_id, shard_name, shard_size for all shards in the list
|
||||
*/
|
||||
static char *
|
||||
GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
||||
useShardMinMaxQuery)
|
||||
GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
|
||||
{
|
||||
StringInfo selectQuery = makeStringInfo();
|
||||
|
||||
ShardInterval *shardInterval = NULL;
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Oid schemaId = get_rel_namespace(shardInterval->relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *shardName = get_rel_name(shardInterval->relationId);
|
||||
AppendShardIdToName(&shardName, shardId);
|
||||
|
||||
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
||||
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
||||
|
||||
if (useShardMinMaxQuery)
|
||||
{
|
||||
AppendShardSizeMinMaxQuery(selectQuery, shardId, shardInterval, shardName,
|
||||
quotedShardName);
|
||||
}
|
||||
else
|
||||
{
|
||||
AppendShardSizeQuery(selectQuery, shardInterval, quotedShardName);
|
||||
}
|
||||
AppendShardSizeQuery(selectQuery, shardInterval);
|
||||
appendStringInfo(selectQuery, " UNION ALL ");
|
||||
}
|
||||
|
||||
|
@ -943,50 +897,25 @@ GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendShardSizeMinMaxQuery appends a query in the following form to selectQuery
|
||||
* SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size
|
||||
*/
|
||||
static void
|
||||
AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
|
||||
ShardInterval *shardInterval, char *shardName,
|
||||
char *quotedShardName)
|
||||
{
|
||||
if (IsCitusTableType(shardInterval->relationId, APPEND_DISTRIBUTED))
|
||||
{
|
||||
/* fill in the partition column name */
|
||||
const uint32 unusedTableId = 1;
|
||||
Var *partitionColumn = PartitionColumn(shardInterval->relationId,
|
||||
unusedTableId);
|
||||
char *partitionColumnName = get_attname(shardInterval->relationId,
|
||||
partitionColumn->varattno, false);
|
||||
appendStringInfo(selectQuery,
|
||||
"SELECT " UINT64_FORMAT
|
||||
" AS shard_id, min(%s)::text AS shard_minvalue, max(%s)::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size FROM %s ",
|
||||
shardId, partitionColumnName,
|
||||
partitionColumnName,
|
||||
quotedShardName, shardName);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* we don't need to update min/max for non-append distributed tables because they don't change */
|
||||
appendStringInfo(selectQuery,
|
||||
"SELECT " UINT64_FORMAT
|
||||
" AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size ",
|
||||
shardId, quotedShardName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendShardSizeQuery appends a query in the following form to selectQuery
|
||||
* SELECT shard_name, shard_size
|
||||
* SELECT shard_id, shard_name, shard_size
|
||||
*/
|
||||
static void
|
||||
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
|
||||
char *quotedShardName)
|
||||
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
|
||||
{
|
||||
appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Oid schemaId = get_rel_namespace(shardInterval->relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *shardName = get_rel_name(shardInterval->relationId);
|
||||
|
||||
AppendShardIdToName(&shardName, shardId);
|
||||
|
||||
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
||||
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
||||
|
||||
appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
|
||||
appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName);
|
||||
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
|
||||
}
|
||||
|
||||
|
@ -1143,6 +1072,44 @@ LoadShardIntervalList(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a
|
||||
* given distributed table. The function returns an empty list if no shards can be found
|
||||
* for the given relation.
|
||||
*
|
||||
* This function does not use CitusTableCache and instead reads from catalog tables
|
||||
* directly.
|
||||
*/
|
||||
List *
|
||||
LoadUnsortedShardIntervalListViaCatalog(Oid relationId)
|
||||
{
|
||||
List *shardIntervalList = NIL;
|
||||
List *distShardTuples = LookupDistShardTuples(relationId);
|
||||
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
|
||||
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
|
||||
Oid intervalTypeId = InvalidOid;
|
||||
int32 intervalTypeMod = -1;
|
||||
|
||||
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
||||
Var *partitionColumn = PartitionColumnViaCatalog(relationId);
|
||||
GetIntervalTypeInfo(partitionMethod, partitionColumn, &intervalTypeId,
|
||||
&intervalTypeMod);
|
||||
|
||||
HeapTuple distShardTuple = NULL;
|
||||
foreach_ptr(distShardTuple, distShardTuples)
|
||||
{
|
||||
ShardInterval *interval = TupleToShardInterval(distShardTuple,
|
||||
distShardTupleDesc,
|
||||
intervalTypeId,
|
||||
intervalTypeMod);
|
||||
shardIntervalList = lappend(shardIntervalList, interval);
|
||||
}
|
||||
table_close(distShardRelation, AccessShareLock);
|
||||
|
||||
return shardIntervalList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LoadShardIntervalWithLongestShardName is a utility function that returns
|
||||
* the shard interaval with the largest shardId for the given relationId. Note
|
||||
|
|
|
@ -123,7 +123,7 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
|
|||
* The SQL_DROP trigger calls this function even for tables that are
|
||||
* not distributed. In that case, silently ignore and return -1.
|
||||
*/
|
||||
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
|
||||
if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation)
|
||||
{
|
||||
PG_RETURN_INT32(-1);
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
LockRelationOid(relationId, AccessExclusiveLock);
|
||||
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
List *shardIntervalList = LoadUnsortedShardIntervalListViaCatalog(relationId);
|
||||
int droppedShardCount = DropShards(relationId, schemaName, relationName,
|
||||
shardIntervalList, dropShardsMetadataOnly);
|
||||
|
||||
|
|
|
@ -65,16 +65,13 @@
|
|||
/* Local functions forward declarations */
|
||||
static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
|
||||
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
|
||||
const char *shardName, uint64 *shardSize,
|
||||
text **shardMinValue, text **shardMaxValue);
|
||||
const char *shardName, uint64 *shardSize);
|
||||
static void UpdateTableStatistics(Oid relationId);
|
||||
static void ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList);
|
||||
static void UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid
|
||||
relationId, List *shardPlacementList, uint64
|
||||
shardSize, text *shardMinValue,
|
||||
text *shardMaxValue);
|
||||
static void ReceiveAndUpdateShardsSizes(List *connectionList);
|
||||
static void UpdateShardSize(uint64 shardId, ShardInterval *shardInterval,
|
||||
Oid relationId, List *shardPlacementList,
|
||||
uint64 shardSize);
|
||||
static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||
text **shardMinValue, text **shardMaxValue,
|
||||
uint64 *shardSize);
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
|
@ -707,8 +704,6 @@ UpdateShardStatistics(int64 shardId)
|
|||
Oid relationId = shardInterval->relationId;
|
||||
bool statsOK = false;
|
||||
uint64 shardSize = 0;
|
||||
text *minValue = NULL;
|
||||
text *maxValue = NULL;
|
||||
|
||||
/* Build shard qualified name. */
|
||||
char *shardName = get_rel_name(relationId);
|
||||
|
@ -726,7 +721,7 @@ UpdateShardStatistics(int64 shardId)
|
|||
foreach_ptr(placement, shardPlacementList)
|
||||
{
|
||||
statsOK = WorkerShardStats(placement, relationId, shardQualifiedName,
|
||||
&shardSize, &minValue, &maxValue);
|
||||
&shardSize);
|
||||
if (statsOK)
|
||||
{
|
||||
break;
|
||||
|
@ -747,8 +742,9 @@ UpdateShardStatistics(int64 shardId)
|
|||
errdetail("Setting shard statistics to NULL")));
|
||||
}
|
||||
|
||||
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, shardPlacementList,
|
||||
shardSize, minValue, maxValue);
|
||||
UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
|
||||
shardSize);
|
||||
|
||||
return shardSize;
|
||||
}
|
||||
|
||||
|
@ -766,24 +762,20 @@ UpdateTableStatistics(Oid relationId)
|
|||
/* we want to use a distributed transaction here to detect distributed deadlocks */
|
||||
bool useDistributedTransaction = true;
|
||||
|
||||
/* we also want shard min/max values for append distributed tables */
|
||||
bool useShardMinMaxQuery = true;
|
||||
List *connectionList =
|
||||
SendShardStatisticsQueriesInParallel(citusTableIds, useDistributedTransaction);
|
||||
|
||||
List *connectionList = SendShardStatisticsQueriesInParallel(citusTableIds,
|
||||
useDistributedTransaction,
|
||||
useShardMinMaxQuery);
|
||||
|
||||
ReceiveAndUpdateShardsSizeAndMinMax(connectionList);
|
||||
ReceiveAndUpdateShardsSizes(connectionList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReceiveAndUpdateShardsSizeAndMinMax receives shard id, size
|
||||
* and min max results from the given connection list, and updates
|
||||
* respective entries in pg_dist_placement and pg_dist_shard
|
||||
* ReceiveAndUpdateShardsSizes receives shard id and size
|
||||
* results from the given connection list, and updates
|
||||
* respective entries in pg_dist_placement.
|
||||
*/
|
||||
static void
|
||||
ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
||||
ReceiveAndUpdateShardsSizes(List *connectionList)
|
||||
{
|
||||
/*
|
||||
* From the connection list, we will not get all the shards, but
|
||||
|
@ -812,7 +804,7 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
|||
int64 colCount = PQnfields(result);
|
||||
|
||||
/* Although it is not expected */
|
||||
if (colCount != UPDATE_SHARD_STATISTICS_COLUMN_COUNT)
|
||||
if (colCount != SHARD_SIZES_COLUMN_COUNT)
|
||||
{
|
||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||
"citus_update_table_statistics")));
|
||||
|
@ -822,12 +814,9 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
|||
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||
{
|
||||
uint64 shardId = 0;
|
||||
text *shardMinValue = NULL;
|
||||
text *shardMaxValue = NULL;
|
||||
uint64 shardSize = 0;
|
||||
|
||||
if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardMinValue,
|
||||
&shardMaxValue, &shardSize))
|
||||
if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardSize))
|
||||
{
|
||||
/* this row has no valid shard statistics */
|
||||
continue;
|
||||
|
@ -845,9 +834,8 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
|||
Oid relationId = shardInterval->relationId;
|
||||
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||
|
||||
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId,
|
||||
shardPlacementList, shardSize, shardMinValue,
|
||||
shardMaxValue);
|
||||
UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
|
||||
shardSize);
|
||||
}
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
|
@ -860,10 +848,13 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
|||
* ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult
|
||||
* - it returns true if this row belongs to a valid shard
|
||||
* - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID)
|
||||
*
|
||||
* Input tuples are assumed to be of the form:
|
||||
* (shard_id bigint, shard_name text, shard_size bigint)
|
||||
*/
|
||||
static bool
|
||||
ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||
text **shardMinValue, text **shardMaxValue, uint64 *shardSize)
|
||||
uint64 *shardSize)
|
||||
{
|
||||
*shardId = ParseIntField(result, rowIndex, 0);
|
||||
|
||||
|
@ -874,28 +865,19 @@ ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
|||
return false;
|
||||
}
|
||||
|
||||
char *minValueResult = PQgetvalue(result, rowIndex, 1);
|
||||
char *maxValueResult = PQgetvalue(result, rowIndex, 2);
|
||||
*shardMinValue = cstring_to_text(minValueResult);
|
||||
*shardMaxValue = cstring_to_text(maxValueResult);
|
||||
*shardSize = ParseIntField(result, rowIndex, 3);
|
||||
*shardSize = ParseIntField(result, rowIndex, 2);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateShardSizeAndMinMax updates the shardlength (shard size) of the given
|
||||
* shard and its placements in pg_dist_placement, and updates the shard min value
|
||||
* and shard max value of the given shard in pg_dist_shard if the relationId belongs
|
||||
* to an append-distributed table
|
||||
* UpdateShardSize updates the shardlength (shard size) of the given
|
||||
* shard and its placements in pg_dist_placement.
|
||||
*/
|
||||
static void
|
||||
UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relationId,
|
||||
List *shardPlacementList, uint64 shardSize, text *shardMinValue,
|
||||
text *shardMaxValue)
|
||||
UpdateShardSize(uint64 shardId, ShardInterval *shardInterval, Oid relationId,
|
||||
List *shardPlacementList, uint64 shardSize)
|
||||
{
|
||||
char storageType = shardInterval->storageType;
|
||||
|
||||
ShardPlacement *placement = NULL;
|
||||
|
||||
/* update metadata for each shard placement */
|
||||
|
@ -906,16 +888,7 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat
|
|||
|
||||
DeleteShardPlacementRow(placementId);
|
||||
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE,
|
||||
shardSize,
|
||||
groupId);
|
||||
}
|
||||
|
||||
/* only update shard min/max values for append-partitioned tables */
|
||||
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||
{
|
||||
DeleteShardRow(shardId);
|
||||
InsertShardRow(relationId, shardId, storageType, shardMinValue,
|
||||
shardMaxValue);
|
||||
shardSize, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -926,17 +899,10 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat
|
|||
*/
|
||||
static bool
|
||||
WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName,
|
||||
uint64 *shardSize, text **shardMinValue, text **shardMaxValue)
|
||||
uint64 *shardSize)
|
||||
{
|
||||
StringInfo tableSizeQuery = makeStringInfo();
|
||||
|
||||
const uint32 unusedTableId = 1;
|
||||
StringInfo partitionValueQuery = makeStringInfo();
|
||||
|
||||
PGresult *queryResult = NULL;
|
||||
const int minValueIndex = 0;
|
||||
const int maxValueIndex = 1;
|
||||
|
||||
char *tableSizeStringEnd = NULL;
|
||||
|
||||
int connectionFlags = 0;
|
||||
|
@ -951,8 +917,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
|
|||
Assert(connection != NULL);
|
||||
|
||||
*shardSize = 0;
|
||||
*shardMinValue = NULL;
|
||||
*shardMaxValue = NULL;
|
||||
|
||||
char *quotedShardName = quote_literal_cstr(shardName);
|
||||
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
|
||||
|
@ -986,40 +950,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
|
|||
PQclear(queryResult);
|
||||
ForgetResults(connection);
|
||||
|
||||
if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||
{
|
||||
/* we don't need min/max for non-append distributed tables */
|
||||
return true;
|
||||
}
|
||||
|
||||
/* fill in the partition column name and shard name in the query. */
|
||||
Var *partitionColumn = PartitionColumn(relationId, unusedTableId);
|
||||
char *partitionColumnName = get_attname(relationId, partitionColumn->varattno, false);
|
||||
appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY,
|
||||
partitionColumnName, partitionColumnName, shardName);
|
||||
|
||||
executeCommand = ExecuteOptionalRemoteCommand(connection, partitionValueQuery->data,
|
||||
&queryResult);
|
||||
if (executeCommand != 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool minValueIsNull = PQgetisnull(queryResult, 0, minValueIndex);
|
||||
bool maxValueIsNull = PQgetisnull(queryResult, 0, maxValueIndex);
|
||||
|
||||
if (!minValueIsNull && !maxValueIsNull)
|
||||
{
|
||||
char *minValueResult = PQgetvalue(queryResult, 0, minValueIndex);
|
||||
char *maxValueResult = PQgetvalue(queryResult, 0, maxValueIndex);
|
||||
|
||||
*shardMinValue = cstring_to_text(minValueResult);
|
||||
*shardMaxValue = cstring_to_text(maxValueResult);
|
||||
}
|
||||
|
||||
PQclear(queryResult);
|
||||
ForgetResults(connection);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -144,9 +144,11 @@ extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
|
|||
CitusTableType tableType);
|
||||
|
||||
extern bool IsCitusTable(Oid relationId);
|
||||
extern bool IsCitusTableViaCatalog(Oid relationId);
|
||||
extern char PgDistPartitionViaCatalog(Oid relationId);
|
||||
extern List * LookupDistShardTuples(Oid relationId);
|
||||
extern char PartitionMethodViaCatalog(Oid relationId);
|
||||
extern Var * PartitionColumnViaCatalog(Oid relationId);
|
||||
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
|
||||
extern List * CitusTableList(void);
|
||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||
|
|
|
@ -31,6 +31,7 @@ typedef enum
|
|||
extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
|
||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||
extern bool ShouldSyncTableMetadata(Oid relationId);
|
||||
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
|
||||
extern List * MetadataCreateCommands(void);
|
||||
extern List * MetadataDropCommands(void);
|
||||
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
|
||||
|
|
|
@ -39,8 +39,7 @@
|
|||
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
|
||||
"worker_partitioned_relation_total_size(%s)"
|
||||
|
||||
#define SHARD_SIZES_COLUMN_COUNT 2
|
||||
#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4
|
||||
#define SHARD_SIZES_COLUMN_COUNT (3)
|
||||
|
||||
/* In-memory representation of a typed tuple in pg_dist_shard. */
|
||||
typedef struct ShardInterval
|
||||
|
@ -202,6 +201,7 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS);
|
|||
/* Function declarations to read shard and shard placement data */
|
||||
extern uint32 TableShardReplicationFactor(Oid relationId);
|
||||
extern List * LoadShardIntervalList(Oid relationId);
|
||||
extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId);
|
||||
extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId);
|
||||
extern int ShardIntervalCount(Oid relationId);
|
||||
extern List * LoadShardList(Oid relationId);
|
||||
|
@ -280,9 +280,8 @@ extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray,
|
|||
int32 intervalTypeMod);
|
||||
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
|
||||
Oid *intervalTypeId, int32 *intervalTypeMod);
|
||||
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool
|
||||
useDistributedTransaction, bool
|
||||
useShardMinMaxQuery);
|
||||
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds,
|
||||
bool useDistributedTransaction);
|
||||
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
|
||||
uint64 *availableBytes,
|
||||
uint64 *totalBytes);
|
||||
|
|
|
@ -64,11 +64,11 @@ SET citus.multi_shard_modify_mode TO sequential;
|
|||
SELECT citus_update_table_statistics('test_table_statistics_hash');
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing COMMIT
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
|
@ -152,11 +152,11 @@ SET citus.multi_shard_modify_mode TO sequential;
|
|||
SELECT citus_update_table_statistics('test_table_statistics_append');
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
||||
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing COMMIT
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
|
@ -181,10 +181,10 @@ WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
|
|||
ORDER BY 2, 3;
|
||||
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
||||
---------------------------------------------------------------------
|
||||
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 1 | 3
|
||||
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 1 | 3
|
||||
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 5 | 7
|
||||
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 5 | 7
|
||||
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | |
|
||||
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | |
|
||||
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | |
|
||||
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | |
|
||||
(4 rows)
|
||||
|
||||
DROP TABLE test_table_statistics_hash, test_table_statistics_append;
|
||||
|
|
|
@ -1,106 +0,0 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-drop-table s2-citus-update-table-statistics s1-commit
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-drop-table:
|
||||
DROP TABLE dist_table;
|
||||
|
||||
step s2-citus-update-table-statistics:
|
||||
SET client_min_messages TO NOTICE;
|
||||
SELECT citus_update_table_statistics(logicalrelid) FROM pg_dist_partition;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-citus-update-table-statistics: <... completed>
|
||||
s2: NOTICE: relation with OID XXXX does not exist, skipping
|
||||
citus_update_table_statistics
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-drop-table s2-citus-shards s1-commit
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-drop-table:
|
||||
DROP TABLE dist_table;
|
||||
|
||||
step s2-citus-shards:
|
||||
SELECT 1 AS result FROM citus_shards GROUP BY result;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-citus-shards: <... completed>
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-citus-shards s1-drop-table s2-commit
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-citus-shards:
|
||||
SELECT 1 AS result FROM citus_shards GROUP BY result;
|
||||
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s1-drop-table:
|
||||
DROP TABLE dist_table;
|
||||
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-citus-update-table-statistics s1-drop-table s2-commit
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-citus-update-table-statistics:
|
||||
SET client_min_messages TO NOTICE;
|
||||
SELECT citus_update_table_statistics(logicalrelid) FROM pg_dist_partition;
|
||||
|
||||
citus_update_table_statistics
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
||||
step s1-drop-table:
|
||||
DROP TABLE dist_table;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-drop-table: <... completed>
|
||||
ERROR: tuple concurrently deleted
|
|
@ -0,0 +1,16 @@
|
|||
CREATE SCHEMA issue_5099;
|
||||
SET search_path to 'issue_5099';
|
||||
CREATE TYPE comp_type AS (
|
||||
int_field_1 BIGINT,
|
||||
int_field_2 BIGINT
|
||||
);
|
||||
CREATE TABLE range_dist_table_2 (dist_col comp_type);
|
||||
SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\set VERBOSITY TERSE
|
||||
DROP SCHEMA issue_5099 CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
|
@ -112,11 +112,8 @@ SELECT count(*) FROM customer_copy_hash;
|
|||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Make sure that master_update_shard_statistics() only updates shard length for
|
||||
-- hash-partitioned tables
|
||||
SELECT master_update_shard_statistics(560000);
|
||||
|
||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000;
|
||||
-- Update shard statistics for hash-partitioned table
|
||||
SELECT citus_update_shard_statistics(560000);
|
||||
|
||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
||||
|
||||
|
@ -206,24 +203,13 @@ FROM customer_copy_range WHERE c_custkey <= 500;
|
|||
-- Check whether data was copied
|
||||
SELECT count(*) FROM customer_copy_range;
|
||||
|
||||
-- Manipulate min/max values and check shard statistics for new shard
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
||||
|
||||
-- Update shard statistics for range-partitioned shard and check that only the
|
||||
-- shard length is updated.
|
||||
SELECT master_update_shard_statistics(:new_shard_id);
|
||||
-- Update shard statistics for range-partitioned shard
|
||||
SELECT citus_update_shard_statistics(:new_shard_id);
|
||||
|
||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
||||
|
||||
-- Revert back min/max value updates
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
-- Create a new append-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_append (
|
||||
c_custkey integer,
|
||||
|
@ -272,16 +258,13 @@ END;
|
|||
SELECT * FROM customer_copy_append;
|
||||
|
||||
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560132;
|
||||
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132;
|
||||
|
||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
|
||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
||||
|
||||
-- Update shard statistics for append-partitioned shard
|
||||
SELECT master_update_shard_statistics(560132);
|
||||
SELECT citus_update_shard_statistics(560132);
|
||||
|
||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
|
||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
||||
|
||||
-- Create lineitem table
|
||||
|
|
|
@ -62,7 +62,6 @@ test: isolation_insert_select_conflict
|
|||
test: shared_connection_waits
|
||||
test: isolation_cancellation
|
||||
test: isolation_undistribute_table
|
||||
test: isolation_citus_update_table_statistics
|
||||
test: isolation_fix_partition_shard_index_names
|
||||
|
||||
# Rebalancer
|
||||
|
|
|
@ -95,7 +95,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
|
|||
test: binary_protocol
|
||||
test: alter_table_set_access_method
|
||||
test: alter_distributed_table
|
||||
test: issue_5248
|
||||
test: issue_5248 issue_5099
|
||||
test: object_propagation_debug
|
||||
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,59 +0,0 @@
|
|||
setup
|
||||
{
|
||||
CREATE TABLE dist_table(a INT, b INT);
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS dist_table;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-drop-table"
|
||||
{
|
||||
DROP TABLE dist_table;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-citus-update-table-statistics"
|
||||
{
|
||||
SET client_min_messages TO NOTICE;
|
||||
SELECT citus_update_table_statistics(logicalrelid) FROM pg_dist_partition;
|
||||
}
|
||||
|
||||
step "s2-citus-shards"
|
||||
{
|
||||
SELECT 1 AS result FROM citus_shards GROUP BY result;
|
||||
}
|
||||
|
||||
step "s2-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
permutation "s1-begin" "s1-drop-table" "s2-citus-update-table-statistics" "s1-commit"
|
||||
permutation "s1-begin" "s1-drop-table" "s2-citus-shards" "s1-commit"
|
||||
permutation "s2-begin" "s2-citus-shards" "s1-drop-table" "s2-commit"
|
||||
|
||||
// ERROR: tuple concurrently deleted -- is expected in the following permutation
|
||||
// Check the explanation at PR #5155 in the following comment
|
||||
// https://github.com/citusdata/citus/pull/5155#issuecomment-897028194
|
||||
permutation "s2-begin" "s2-citus-update-table-statistics" "s1-drop-table" "s2-commit"
|
|
@ -0,0 +1,11 @@
|
|||
CREATE SCHEMA issue_5099;
|
||||
SET search_path to 'issue_5099';
|
||||
CREATE TYPE comp_type AS (
|
||||
int_field_1 BIGINT,
|
||||
int_field_2 BIGINT
|
||||
);
|
||||
|
||||
CREATE TABLE range_dist_table_2 (dist_col comp_type);
|
||||
SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range');
|
||||
\set VERBOSITY TERSE
|
||||
DROP SCHEMA issue_5099 CASCADE;
|
Loading…
Reference in New Issue