mirror of https://github.com/citusdata/citus.git
Merge pull request #5481 from citusdata/marcocitus/remove-shard-range-update
commit
7694569976
|
@ -80,26 +80,18 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
SizeQueryType sizeQueryType, bool failOnError,
|
SizeQueryType sizeQueryType, bool failOnError,
|
||||||
uint64 *tableSize);
|
uint64 *tableSize);
|
||||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||||
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
|
||||||
useShardMinMaxQuery);
|
|
||||||
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
|
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
|
||||||
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
|
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
|
||||||
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
|
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
|
||||||
List *citusTableIds, bool
|
List *citusTableIds);
|
||||||
useShardMinMaxQuery);
|
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds);
|
||||||
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds,
|
|
||||||
bool useShardMinMaxQuery);
|
|
||||||
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
||||||
static List * OpenConnectionToNodes(List *workerNodeList);
|
static List * OpenConnectionToNodes(List *workerNodeList);
|
||||||
static void ReceiveShardNameAndSizeResults(List *connectionList,
|
static void ReceiveShardNameAndSizeResults(List *connectionList,
|
||||||
Tuplestorestate *tupleStore,
|
Tuplestorestate *tupleStore,
|
||||||
TupleDesc tupleDescriptor);
|
TupleDesc tupleDescriptor);
|
||||||
static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
|
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
|
||||||
ShardInterval *
|
|
||||||
shardInterval, char *shardName,
|
|
||||||
char *quotedShardName);
|
|
||||||
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
|
|
||||||
char *quotedShardName);
|
|
||||||
|
|
||||||
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
|
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
|
||||||
uint64 totalBytes);
|
uint64 totalBytes);
|
||||||
|
@ -245,11 +237,8 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
|
||||||
/* we don't need a distributed transaction here */
|
/* we don't need a distributed transaction here */
|
||||||
bool useDistributedTransaction = false;
|
bool useDistributedTransaction = false;
|
||||||
|
|
||||||
/* we only want the shard sizes here so useShardMinMaxQuery parameter is false */
|
List *connectionList =
|
||||||
bool useShardMinMaxQuery = false;
|
SendShardStatisticsQueriesInParallel(allCitusTableIds, useDistributedTransaction);
|
||||||
List *connectionList = SendShardStatisticsQueriesInParallel(allCitusTableIds,
|
|
||||||
useDistributedTransaction,
|
|
||||||
useShardMinMaxQuery);
|
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||||
|
@ -342,15 +331,12 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
||||||
* to available nodes. It returns the connection list.
|
* to available nodes. It returns the connection list.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction,
|
SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction)
|
||||||
bool
|
|
||||||
useShardMinMaxQuery)
|
|
||||||
{
|
{
|
||||||
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
||||||
|
|
||||||
List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList,
|
List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList,
|
||||||
citusTableIds,
|
citusTableIds);
|
||||||
useShardMinMaxQuery);
|
|
||||||
|
|
||||||
List *connectionList = OpenConnectionToNodes(workerNodeList);
|
List *connectionList = OpenConnectionToNodes(workerNodeList);
|
||||||
FinishConnectionListEstablishment(connectionList);
|
FinishConnectionListEstablishment(connectionList);
|
||||||
|
@ -415,20 +401,18 @@ OpenConnectionToNodes(List *workerNodeList)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateShardStatisticsQueryList generates a query per node that will return:
|
* GenerateShardStatisticsQueryList generates a query per node that will return:
|
||||||
* - all shard_name, shard_size pairs from the node (if includeShardMinMax is false)
|
* shard_id, shard_name, shard_size for all shard placements on the node
|
||||||
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples from the node (if true)
|
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, bool
|
GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds)
|
||||||
useShardMinMaxQuery)
|
|
||||||
{
|
{
|
||||||
List *shardStatisticsQueryList = NIL;
|
List *shardStatisticsQueryList = NIL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
{
|
{
|
||||||
char *shardStatisticsQuery = GenerateAllShardStatisticsQueryForNode(workerNode,
|
char *shardStatisticsQuery =
|
||||||
citusTableIds,
|
GenerateAllShardStatisticsQueryForNode(workerNode, citusTableIds);
|
||||||
useShardMinMaxQuery);
|
|
||||||
shardStatisticsQueryList = lappend(shardStatisticsQueryList,
|
shardStatisticsQueryList = lappend(shardStatisticsQueryList,
|
||||||
shardStatisticsQuery);
|
shardStatisticsQuery);
|
||||||
}
|
}
|
||||||
|
@ -479,12 +463,13 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore
|
||||||
memset(values, 0, sizeof(values));
|
memset(values, 0, sizeof(values));
|
||||||
memset(isNulls, false, sizeof(isNulls));
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
|
||||||
char *tableName = PQgetvalue(result, rowIndex, 0);
|
/* format is [0] shard id, [1] shard name, [2] size */
|
||||||
|
char *tableName = PQgetvalue(result, rowIndex, 1);
|
||||||
Datum resultStringDatum = CStringGetDatum(tableName);
|
Datum resultStringDatum = CStringGetDatum(tableName);
|
||||||
Datum textDatum = DirectFunctionCall1(textin, resultStringDatum);
|
Datum textDatum = DirectFunctionCall1(textin, resultStringDatum);
|
||||||
|
|
||||||
values[0] = textDatum;
|
values[0] = textDatum;
|
||||||
values[1] = ParseIntField(result, rowIndex, 1);
|
values[1] = ParseIntField(result, rowIndex, 2);
|
||||||
|
|
||||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
}
|
}
|
||||||
|
@ -858,12 +843,10 @@ GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateAllShardStatisticsQueryForNode generates a query that returns:
|
* GenerateAllShardStatisticsQueryForNode generates a query that returns:
|
||||||
* - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false)
|
* shard_id, shard_name, shard_size for all shard placements on the node
|
||||||
* - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples (if true)
|
|
||||||
*/
|
*/
|
||||||
static char *
|
static char *
|
||||||
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool
|
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
|
||||||
useShardMinMaxQuery)
|
|
||||||
{
|
{
|
||||||
StringInfo allShardStatisticsQuery = makeStringInfo();
|
StringInfo allShardStatisticsQuery = makeStringInfo();
|
||||||
|
|
||||||
|
@ -881,61 +864,32 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
|
||||||
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
|
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
|
||||||
relationId);
|
relationId);
|
||||||
char *shardStatisticsQuery =
|
char *shardStatisticsQuery =
|
||||||
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode,
|
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
|
||||||
useShardMinMaxQuery);
|
|
||||||
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
|
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
|
||||||
relation_close(relation, AccessShareLock);
|
relation_close(relation, AccessShareLock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a dummy entry so that UNION ALL doesn't complain */
|
/* Add a dummy entry so that UNION ALL doesn't complain */
|
||||||
if (useShardMinMaxQuery)
|
appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, NULL::text, 0::bigint;");
|
||||||
{
|
|
||||||
/* 0 for shard_id, NULL for min, NULL for text, 0 for shard_size */
|
|
||||||
appendStringInfo(allShardStatisticsQuery,
|
|
||||||
"SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* NULL for shard_name, 0 for shard_size */
|
|
||||||
appendStringInfo(allShardStatisticsQuery, "SELECT NULL::text, 0::bigint;");
|
|
||||||
}
|
|
||||||
return allShardStatisticsQuery->data;
|
return allShardStatisticsQuery->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateShardStatisticsQueryForShardList generates one of the two types of queries:
|
* GenerateShardStatisticsQueryForShardList generates a query that returns:
|
||||||
* - SELECT shard_name - shard_size (if useShardMinMaxQuery is false)
|
* SELECT shard_id, shard_name, shard_size for all shards in the list
|
||||||
* - SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size (if true)
|
|
||||||
*/
|
*/
|
||||||
static char *
|
static char *
|
||||||
GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
|
||||||
useShardMinMaxQuery)
|
|
||||||
{
|
{
|
||||||
StringInfo selectQuery = makeStringInfo();
|
StringInfo selectQuery = makeStringInfo();
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardInterval->shardId;
|
AppendShardSizeQuery(selectQuery, shardInterval);
|
||||||
Oid schemaId = get_rel_namespace(shardInterval->relationId);
|
|
||||||
char *schemaName = get_namespace_name(schemaId);
|
|
||||||
char *shardName = get_rel_name(shardInterval->relationId);
|
|
||||||
AppendShardIdToName(&shardName, shardId);
|
|
||||||
|
|
||||||
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
|
||||||
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
|
||||||
|
|
||||||
if (useShardMinMaxQuery)
|
|
||||||
{
|
|
||||||
AppendShardSizeMinMaxQuery(selectQuery, shardId, shardInterval, shardName,
|
|
||||||
quotedShardName);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
AppendShardSizeQuery(selectQuery, shardInterval, quotedShardName);
|
|
||||||
}
|
|
||||||
appendStringInfo(selectQuery, " UNION ALL ");
|
appendStringInfo(selectQuery, " UNION ALL ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -943,50 +897,25 @@ GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AppendShardSizeMinMaxQuery appends a query in the following form to selectQuery
|
|
||||||
* SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId,
|
|
||||||
ShardInterval *shardInterval, char *shardName,
|
|
||||||
char *quotedShardName)
|
|
||||||
{
|
|
||||||
if (IsCitusTableType(shardInterval->relationId, APPEND_DISTRIBUTED))
|
|
||||||
{
|
|
||||||
/* fill in the partition column name */
|
|
||||||
const uint32 unusedTableId = 1;
|
|
||||||
Var *partitionColumn = PartitionColumn(shardInterval->relationId,
|
|
||||||
unusedTableId);
|
|
||||||
char *partitionColumnName = get_attname(shardInterval->relationId,
|
|
||||||
partitionColumn->varattno, false);
|
|
||||||
appendStringInfo(selectQuery,
|
|
||||||
"SELECT " UINT64_FORMAT
|
|
||||||
" AS shard_id, min(%s)::text AS shard_minvalue, max(%s)::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size FROM %s ",
|
|
||||||
shardId, partitionColumnName,
|
|
||||||
partitionColumnName,
|
|
||||||
quotedShardName, shardName);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* we don't need to update min/max for non-append distributed tables because they don't change */
|
|
||||||
appendStringInfo(selectQuery,
|
|
||||||
"SELECT " UINT64_FORMAT
|
|
||||||
" AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size ",
|
|
||||||
shardId, quotedShardName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AppendShardSizeQuery appends a query in the following form to selectQuery
|
* AppendShardSizeQuery appends a query in the following form to selectQuery
|
||||||
* SELECT shard_name, shard_size
|
* SELECT shard_id, shard_name, shard_size
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval,
|
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
|
||||||
char *quotedShardName)
|
|
||||||
{
|
{
|
||||||
appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName);
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
Oid schemaId = get_rel_namespace(shardInterval->relationId);
|
||||||
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
|
char *shardName = get_rel_name(shardInterval->relationId);
|
||||||
|
|
||||||
|
AppendShardIdToName(&shardName, shardId);
|
||||||
|
|
||||||
|
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
||||||
|
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
||||||
|
|
||||||
|
appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
|
||||||
|
appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName);
|
||||||
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
|
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,16 +65,13 @@
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
|
static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
|
||||||
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
|
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
|
||||||
const char *shardName, uint64 *shardSize,
|
const char *shardName, uint64 *shardSize);
|
||||||
text **shardMinValue, text **shardMaxValue);
|
|
||||||
static void UpdateTableStatistics(Oid relationId);
|
static void UpdateTableStatistics(Oid relationId);
|
||||||
static void ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList);
|
static void ReceiveAndUpdateShardsSizes(List *connectionList);
|
||||||
static void UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid
|
static void UpdateShardSize(uint64 shardId, ShardInterval *shardInterval,
|
||||||
relationId, List *shardPlacementList, uint64
|
Oid relationId, List *shardPlacementList,
|
||||||
shardSize, text *shardMinValue,
|
uint64 shardSize);
|
||||||
text *shardMaxValue);
|
|
||||||
static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||||
text **shardMinValue, text **shardMaxValue,
|
|
||||||
uint64 *shardSize);
|
uint64 *shardSize);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -707,8 +704,6 @@ UpdateShardStatistics(int64 shardId)
|
||||||
Oid relationId = shardInterval->relationId;
|
Oid relationId = shardInterval->relationId;
|
||||||
bool statsOK = false;
|
bool statsOK = false;
|
||||||
uint64 shardSize = 0;
|
uint64 shardSize = 0;
|
||||||
text *minValue = NULL;
|
|
||||||
text *maxValue = NULL;
|
|
||||||
|
|
||||||
/* Build shard qualified name. */
|
/* Build shard qualified name. */
|
||||||
char *shardName = get_rel_name(relationId);
|
char *shardName = get_rel_name(relationId);
|
||||||
|
@ -726,7 +721,7 @@ UpdateShardStatistics(int64 shardId)
|
||||||
foreach_ptr(placement, shardPlacementList)
|
foreach_ptr(placement, shardPlacementList)
|
||||||
{
|
{
|
||||||
statsOK = WorkerShardStats(placement, relationId, shardQualifiedName,
|
statsOK = WorkerShardStats(placement, relationId, shardQualifiedName,
|
||||||
&shardSize, &minValue, &maxValue);
|
&shardSize);
|
||||||
if (statsOK)
|
if (statsOK)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
@ -747,8 +742,9 @@ UpdateShardStatistics(int64 shardId)
|
||||||
errdetail("Setting shard statistics to NULL")));
|
errdetail("Setting shard statistics to NULL")));
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, shardPlacementList,
|
UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
|
||||||
shardSize, minValue, maxValue);
|
shardSize);
|
||||||
|
|
||||||
return shardSize;
|
return shardSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -766,24 +762,20 @@ UpdateTableStatistics(Oid relationId)
|
||||||
/* we want to use a distributed transaction here to detect distributed deadlocks */
|
/* we want to use a distributed transaction here to detect distributed deadlocks */
|
||||||
bool useDistributedTransaction = true;
|
bool useDistributedTransaction = true;
|
||||||
|
|
||||||
/* we also want shard min/max values for append distributed tables */
|
List *connectionList =
|
||||||
bool useShardMinMaxQuery = true;
|
SendShardStatisticsQueriesInParallel(citusTableIds, useDistributedTransaction);
|
||||||
|
|
||||||
List *connectionList = SendShardStatisticsQueriesInParallel(citusTableIds,
|
ReceiveAndUpdateShardsSizes(connectionList);
|
||||||
useDistributedTransaction,
|
|
||||||
useShardMinMaxQuery);
|
|
||||||
|
|
||||||
ReceiveAndUpdateShardsSizeAndMinMax(connectionList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReceiveAndUpdateShardsSizeAndMinMax receives shard id, size
|
* ReceiveAndUpdateShardsSizes receives shard id and size
|
||||||
* and min max results from the given connection list, and updates
|
* results from the given connection list, and updates
|
||||||
* respective entries in pg_dist_placement and pg_dist_shard
|
* respective entries in pg_dist_placement.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
ReceiveAndUpdateShardsSizes(List *connectionList)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* From the connection list, we will not get all the shards, but
|
* From the connection list, we will not get all the shards, but
|
||||||
|
@ -812,7 +804,7 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
||||||
int64 colCount = PQnfields(result);
|
int64 colCount = PQnfields(result);
|
||||||
|
|
||||||
/* Although it is not expected */
|
/* Although it is not expected */
|
||||||
if (colCount != UPDATE_SHARD_STATISTICS_COLUMN_COUNT)
|
if (colCount != SHARD_SIZES_COLUMN_COUNT)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||||
"citus_update_table_statistics")));
|
"citus_update_table_statistics")));
|
||||||
|
@ -822,12 +814,9 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
||||||
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||||
{
|
{
|
||||||
uint64 shardId = 0;
|
uint64 shardId = 0;
|
||||||
text *shardMinValue = NULL;
|
|
||||||
text *shardMaxValue = NULL;
|
|
||||||
uint64 shardSize = 0;
|
uint64 shardSize = 0;
|
||||||
|
|
||||||
if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardMinValue,
|
if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardSize))
|
||||||
&shardMaxValue, &shardSize))
|
|
||||||
{
|
{
|
||||||
/* this row has no valid shard statistics */
|
/* this row has no valid shard statistics */
|
||||||
continue;
|
continue;
|
||||||
|
@ -845,9 +834,8 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
||||||
Oid relationId = shardInterval->relationId;
|
Oid relationId = shardInterval->relationId;
|
||||||
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||||
|
|
||||||
UpdateShardSizeAndMinMax(shardId, shardInterval, relationId,
|
UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
|
||||||
shardPlacementList, shardSize, shardMinValue,
|
shardSize);
|
||||||
shardMaxValue);
|
|
||||||
}
|
}
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
|
@ -860,10 +848,13 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList)
|
||||||
* ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult
|
* ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult
|
||||||
* - it returns true if this row belongs to a valid shard
|
* - it returns true if this row belongs to a valid shard
|
||||||
* - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID)
|
* - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID)
|
||||||
|
*
|
||||||
|
* Input tuples are assumed to be of the form:
|
||||||
|
* (shard_id bigint, shard_name text, shard_size bigint)
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||||
text **shardMinValue, text **shardMaxValue, uint64 *shardSize)
|
uint64 *shardSize)
|
||||||
{
|
{
|
||||||
*shardId = ParseIntField(result, rowIndex, 0);
|
*shardId = ParseIntField(result, rowIndex, 0);
|
||||||
|
|
||||||
|
@ -874,28 +865,19 @@ ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *minValueResult = PQgetvalue(result, rowIndex, 1);
|
*shardSize = ParseIntField(result, rowIndex, 2);
|
||||||
char *maxValueResult = PQgetvalue(result, rowIndex, 2);
|
|
||||||
*shardMinValue = cstring_to_text(minValueResult);
|
|
||||||
*shardMaxValue = cstring_to_text(maxValueResult);
|
|
||||||
*shardSize = ParseIntField(result, rowIndex, 3);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateShardSizeAndMinMax updates the shardlength (shard size) of the given
|
* UpdateShardSize updates the shardlength (shard size) of the given
|
||||||
* shard and its placements in pg_dist_placement, and updates the shard min value
|
* shard and its placements in pg_dist_placement.
|
||||||
* and shard max value of the given shard in pg_dist_shard if the relationId belongs
|
|
||||||
* to an append-distributed table
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relationId,
|
UpdateShardSize(uint64 shardId, ShardInterval *shardInterval, Oid relationId,
|
||||||
List *shardPlacementList, uint64 shardSize, text *shardMinValue,
|
List *shardPlacementList, uint64 shardSize)
|
||||||
text *shardMaxValue)
|
|
||||||
{
|
{
|
||||||
char storageType = shardInterval->storageType;
|
|
||||||
|
|
||||||
ShardPlacement *placement = NULL;
|
ShardPlacement *placement = NULL;
|
||||||
|
|
||||||
/* update metadata for each shard placement */
|
/* update metadata for each shard placement */
|
||||||
|
@ -906,16 +888,7 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat
|
||||||
|
|
||||||
DeleteShardPlacementRow(placementId);
|
DeleteShardPlacementRow(placementId);
|
||||||
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE,
|
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE,
|
||||||
shardSize,
|
shardSize, groupId);
|
||||||
groupId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* only update shard min/max values for append-partitioned tables */
|
|
||||||
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
|
||||||
{
|
|
||||||
DeleteShardRow(shardId);
|
|
||||||
InsertShardRow(relationId, shardId, storageType, shardMinValue,
|
|
||||||
shardMaxValue);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -926,17 +899,10 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName,
|
WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName,
|
||||||
uint64 *shardSize, text **shardMinValue, text **shardMaxValue)
|
uint64 *shardSize)
|
||||||
{
|
{
|
||||||
StringInfo tableSizeQuery = makeStringInfo();
|
StringInfo tableSizeQuery = makeStringInfo();
|
||||||
|
|
||||||
const uint32 unusedTableId = 1;
|
|
||||||
StringInfo partitionValueQuery = makeStringInfo();
|
|
||||||
|
|
||||||
PGresult *queryResult = NULL;
|
PGresult *queryResult = NULL;
|
||||||
const int minValueIndex = 0;
|
|
||||||
const int maxValueIndex = 1;
|
|
||||||
|
|
||||||
char *tableSizeStringEnd = NULL;
|
char *tableSizeStringEnd = NULL;
|
||||||
|
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
|
@ -951,8 +917,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
|
||||||
Assert(connection != NULL);
|
Assert(connection != NULL);
|
||||||
|
|
||||||
*shardSize = 0;
|
*shardSize = 0;
|
||||||
*shardMinValue = NULL;
|
|
||||||
*shardMaxValue = NULL;
|
|
||||||
|
|
||||||
char *quotedShardName = quote_literal_cstr(shardName);
|
char *quotedShardName = quote_literal_cstr(shardName);
|
||||||
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
|
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
|
||||||
|
@ -986,40 +950,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
|
||||||
PQclear(queryResult);
|
PQclear(queryResult);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
|
|
||||||
if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
|
||||||
{
|
|
||||||
/* we don't need min/max for non-append distributed tables */
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* fill in the partition column name and shard name in the query. */
|
|
||||||
Var *partitionColumn = PartitionColumn(relationId, unusedTableId);
|
|
||||||
char *partitionColumnName = get_attname(relationId, partitionColumn->varattno, false);
|
|
||||||
appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY,
|
|
||||||
partitionColumnName, partitionColumnName, shardName);
|
|
||||||
|
|
||||||
executeCommand = ExecuteOptionalRemoteCommand(connection, partitionValueQuery->data,
|
|
||||||
&queryResult);
|
|
||||||
if (executeCommand != 0)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool minValueIsNull = PQgetisnull(queryResult, 0, minValueIndex);
|
|
||||||
bool maxValueIsNull = PQgetisnull(queryResult, 0, maxValueIndex);
|
|
||||||
|
|
||||||
if (!minValueIsNull && !maxValueIsNull)
|
|
||||||
{
|
|
||||||
char *minValueResult = PQgetvalue(queryResult, 0, minValueIndex);
|
|
||||||
char *maxValueResult = PQgetvalue(queryResult, 0, maxValueIndex);
|
|
||||||
|
|
||||||
*shardMinValue = cstring_to_text(minValueResult);
|
|
||||||
*shardMaxValue = cstring_to_text(maxValueResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(queryResult);
|
|
||||||
ForgetResults(connection);
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,8 +39,7 @@
|
||||||
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
|
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
|
||||||
"worker_partitioned_relation_total_size(%s)"
|
"worker_partitioned_relation_total_size(%s)"
|
||||||
|
|
||||||
#define SHARD_SIZES_COLUMN_COUNT 2
|
#define SHARD_SIZES_COLUMN_COUNT (3)
|
||||||
#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4
|
|
||||||
|
|
||||||
/* In-memory representation of a typed tuple in pg_dist_shard. */
|
/* In-memory representation of a typed tuple in pg_dist_shard. */
|
||||||
typedef struct ShardInterval
|
typedef struct ShardInterval
|
||||||
|
@ -281,9 +280,8 @@ extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray,
|
||||||
int32 intervalTypeMod);
|
int32 intervalTypeMod);
|
||||||
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
|
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
|
||||||
Oid *intervalTypeId, int32 *intervalTypeMod);
|
Oid *intervalTypeId, int32 *intervalTypeMod);
|
||||||
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool
|
extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds,
|
||||||
useDistributedTransaction, bool
|
bool useDistributedTransaction);
|
||||||
useShardMinMaxQuery);
|
|
||||||
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
|
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
|
||||||
uint64 *availableBytes,
|
uint64 *availableBytes,
|
||||||
uint64 *totalBytes);
|
uint64 *totalBytes);
|
||||||
|
|
|
@ -64,11 +64,11 @@ SET citus.multi_shard_modify_mode TO sequential;
|
||||||
SELECT citus_update_table_statistics('test_table_statistics_hash');
|
SELECT citus_update_table_statistics('test_table_statistics_hash');
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing COMMIT
|
NOTICE: issuing COMMIT
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
@ -152,11 +152,11 @@ SET citus.multi_shard_modify_mode TO sequential;
|
||||||
SELECT citus_update_table_statistics('test_table_statistics_append');
|
SELECT citus_update_table_statistics('test_table_statistics_append');
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;
|
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing COMMIT
|
NOTICE: issuing COMMIT
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
@ -181,10 +181,10 @@ WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append')
|
||||||
ORDER BY 2, 3;
|
ORDER BY 2, 3;
|
||||||
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 1 | 3
|
test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | |
|
||||||
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 1 | 3
|
test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | |
|
||||||
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 5 | 7
|
test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | |
|
||||||
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 5 | 7
|
test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | |
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
DROP TABLE test_table_statistics_hash, test_table_statistics_append;
|
DROP TABLE test_table_statistics_hash, test_table_statistics_append;
|
||||||
|
|
|
@ -1,106 +0,0 @@
|
||||||
Parsed test spec with 2 sessions
|
|
||||||
|
|
||||||
starting permutation: s1-begin s1-drop-table s2-citus-update-table-statistics s1-commit
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s1-begin:
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
step s1-drop-table:
|
|
||||||
DROP TABLE dist_table;
|
|
||||||
|
|
||||||
step s2-citus-update-table-statistics:
|
|
||||||
SET client_min_messages TO NOTICE;
|
|
||||||
SELECT citus_update_table_statistics(logicalrelid) FROM pg_dist_partition;
|
|
||||||
<waiting ...>
|
|
||||||
step s1-commit:
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
step s2-citus-update-table-statistics: <... completed>
|
|
||||||
s2: NOTICE: relation with OID XXXX does not exist, skipping
|
|
||||||
citus_update_table_statistics
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
(2 rows)
|
|
||||||
|
|
||||||
|
|
||||||
starting permutation: s1-begin s1-drop-table s2-citus-shards s1-commit
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s1-begin:
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
step s1-drop-table:
|
|
||||||
DROP TABLE dist_table;
|
|
||||||
|
|
||||||
step s2-citus-shards:
|
|
||||||
SELECT 1 AS result FROM citus_shards GROUP BY result;
|
|
||||||
<waiting ...>
|
|
||||||
step s1-commit:
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
step s2-citus-shards: <... completed>
|
|
||||||
result
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
1
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
|
|
||||||
starting permutation: s2-begin s2-citus-shards s1-drop-table s2-commit
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s2-begin:
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
step s2-citus-shards:
|
|
||||||
SELECT 1 AS result FROM citus_shards GROUP BY result;
|
|
||||||
|
|
||||||
result
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
1
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s1-drop-table:
|
|
||||||
DROP TABLE dist_table;
|
|
||||||
|
|
||||||
step s2-commit:
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
|
|
||||||
starting permutation: s2-begin s2-citus-update-table-statistics s1-drop-table s2-commit
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s2-begin:
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
step s2-citus-update-table-statistics:
|
|
||||||
SET client_min_messages TO NOTICE;
|
|
||||||
SELECT citus_update_table_statistics(logicalrelid) FROM pg_dist_partition;
|
|
||||||
|
|
||||||
citus_update_table_statistics
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
(2 rows)
|
|
||||||
|
|
||||||
step s1-drop-table:
|
|
||||||
DROP TABLE dist_table;
|
|
||||||
<waiting ...>
|
|
||||||
step s2-commit:
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
step s1-drop-table: <... completed>
|
|
||||||
ERROR: tuple concurrently deleted
|
|
|
@ -112,11 +112,8 @@ SELECT count(*) FROM customer_copy_hash;
|
||||||
-- Confirm that data was copied
|
-- Confirm that data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
-- Make sure that master_update_shard_statistics() only updates shard length for
|
-- Update shard statistics for hash-partitioned table
|
||||||
-- hash-partitioned tables
|
SELECT citus_update_shard_statistics(560000);
|
||||||
SELECT master_update_shard_statistics(560000);
|
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000;
|
|
||||||
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
||||||
|
|
||||||
|
@ -206,24 +203,13 @@ FROM customer_copy_range WHERE c_custkey <= 500;
|
||||||
-- Check whether data was copied
|
-- Check whether data was copied
|
||||||
SELECT count(*) FROM customer_copy_range;
|
SELECT count(*) FROM customer_copy_range;
|
||||||
|
|
||||||
-- Manipulate min/max values and check shard statistics for new shard
|
|
||||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000
|
|
||||||
WHERE shardid = :new_shard_id;
|
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
-- Update shard statistics for range-partitioned shard and check that only the
|
-- Update shard statistics for range-partitioned shard
|
||||||
-- shard length is updated.
|
SELECT citus_update_shard_statistics(:new_shard_id);
|
||||||
SELECT master_update_shard_statistics(:new_shard_id);
|
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
-- Revert back min/max value updates
|
|
||||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
|
||||||
WHERE shardid = :new_shard_id;
|
|
||||||
|
|
||||||
-- Create a new append-partitioned table into which to COPY
|
-- Create a new append-partitioned table into which to COPY
|
||||||
CREATE TABLE customer_copy_append (
|
CREATE TABLE customer_copy_append (
|
||||||
c_custkey integer,
|
c_custkey integer,
|
||||||
|
@ -272,16 +258,13 @@ END;
|
||||||
SELECT * FROM customer_copy_append;
|
SELECT * FROM customer_copy_append;
|
||||||
|
|
||||||
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
||||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560132;
|
|
||||||
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132;
|
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132;
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
||||||
|
|
||||||
-- Update shard statistics for append-partitioned shard
|
-- Update shard statistics for append-partitioned shard
|
||||||
SELECT master_update_shard_statistics(560132);
|
SELECT citus_update_shard_statistics(560132);
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
||||||
|
|
||||||
-- Create lineitem table
|
-- Create lineitem table
|
||||||
|
|
|
@ -62,7 +62,6 @@ test: isolation_insert_select_conflict
|
||||||
test: shared_connection_waits
|
test: shared_connection_waits
|
||||||
test: isolation_cancellation
|
test: isolation_cancellation
|
||||||
test: isolation_undistribute_table
|
test: isolation_undistribute_table
|
||||||
test: isolation_citus_update_table_statistics
|
|
||||||
test: isolation_fix_partition_shard_index_names
|
test: isolation_fix_partition_shard_index_names
|
||||||
|
|
||||||
# Rebalancer
|
# Rebalancer
|
||||||
|
|
|
@ -15,7 +15,7 @@ CREATE TABLE customer_copy_hash (
|
||||||
primary key (c_custkey));
|
primary key (c_custkey));
|
||||||
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ DETAIL: No shards exist for distributed table "customer_copy_hash".
|
||||||
HINT: Run master_create_worker_shards to create shards and try again.
|
HINT: Run master_create_worker_shards to create shards and try again.
|
||||||
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
||||||
master_create_worker_shards
|
master_create_worker_shards
|
||||||
-----------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -34,15 +34,15 @@ SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
||||||
COPY customer_copy_hash FROM STDIN;
|
COPY customer_copy_hash FROM STDIN;
|
||||||
-- Test syntax error
|
-- Test syntax error
|
||||||
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
||||||
ERROR: invalid input syntax for type integer: "1,customer1"
|
ERROR: invalid input syntax for integer: "1,customer1"
|
||||||
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
|
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
|
||||||
-- Test invalid option
|
-- Test invalid option
|
||||||
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN (append_to_shard 1);
|
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN (append_to_shard xxxxx);
|
||||||
ERROR: append_to_shard is only valid for append-distributed tables
|
ERROR: append_to_shard is only valid for append-distributed tables
|
||||||
-- Confirm that no data was copied
|
-- Confirm that no data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ DETAIL: Key (c_custkey)=(2) already exists.
|
||||||
-- Confirm that no data was copied
|
-- Confirm that no data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
||||||
-- Confirm that only first row was skipped
|
-- Confirm that only first row was skipped
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
3
|
3
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -74,7 +74,7 @@ WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
||||||
-- Confirm that value is not null
|
-- Confirm that value is not null
|
||||||
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -84,19 +84,19 @@ WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
||||||
-- Confirm that value is null
|
-- Confirm that value is null
|
||||||
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Test null violation
|
-- Test null violation
|
||||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
WITH (FORMAT 'csv');
|
WITH (FORMAT 'csv');
|
||||||
ERROR: null value in column "c_name" of relation "customer_copy_hash_560001" violates not-null constraint
|
ERROR: null value in column "c_name" violates not-null constraint
|
||||||
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
|
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
|
||||||
-- Confirm that no data was copied
|
-- Confirm that no data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
5
|
5
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ WITH (DELIMITER ' ');
|
||||||
-- Confirm that data was copied
|
-- Confirm that data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -115,7 +115,7 @@ COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER
|
||||||
-- Confirm that data was copied
|
-- Confirm that data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
1006
|
1006
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -124,27 +124,20 @@ SELECT count(*) FROM customer_copy_hash;
|
||||||
-- Confirm that data was copied
|
-- Confirm that data was copied
|
||||||
SELECT count(*) FROM customer_copy_hash;
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
2006
|
2006
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Make sure that master_update_shard_statistics() only updates shard length for
|
-- Update shard statistics for hash-partitioned table
|
||||||
-- hash-partitioned tables
|
SELECT citus_update_shard_statistics(560000);
|
||||||
SELECT master_update_shard_statistics(560000);
|
citus_update_shard_statistics
|
||||||
master_update_shard_statistics
|
---------------------------------------------------------------------
|
||||||
--------------------------------
|
|
||||||
8192
|
8192
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000;
|
|
||||||
shardid | shardminvalue | shardmaxvalue
|
|
||||||
---------+---------------+---------------
|
|
||||||
560000 | -2147483648 | -2080374785
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
||||||
shardid | shardlength
|
shardid | shardlength
|
||||||
---------+-------------
|
---------------------------------------------------------------------
|
||||||
560000 | 8192
|
560000 | 8192
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -155,13 +148,13 @@ CREATE TABLE customer_with_default(
|
||||||
c_time timestamp default now());
|
c_time timestamp default now());
|
||||||
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
|
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_create_worker_shards('customer_with_default', 64, 1);
|
SELECT master_create_worker_shards('customer_with_default', 64, 1);
|
||||||
master_create_worker_shards
|
master_create_worker_shards
|
||||||
-----------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -171,7 +164,7 @@ WITH (FORMAT 'csv');
|
||||||
-- Confirm that data was copied with now() function
|
-- Confirm that data was copied with now() function
|
||||||
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
|
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
2
|
2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -181,7 +174,7 @@ ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
|
||||||
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
|
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
|
||||||
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
|
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
|
||||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
|
||||||
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+--------
|
---------------------------------------------------------------------
|
||||||
10 | customer10 | | | | | | | 1 | 5
|
10 | customer10 | | | | | | | 1 | 5
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -190,7 +183,7 @@ ALTER TABLE customer_copy_hash DROP COLUMN extra1;
|
||||||
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
|
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
|
||||||
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
|
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
|
||||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
|
||||||
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------
|
---------------------------------------------------------------------
|
||||||
11 | customer11 | | | | | | | 5
|
11 | customer11 | | | | | | | 5
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -199,7 +192,7 @@ ALTER TABLE customer_copy_hash DROP COLUMN extra2;
|
||||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
|
||||||
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
|
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
|
||||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
||||||
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------
|
---------------------------------------------------------------------
|
||||||
12 | customer12 | | | | | |
|
12 | customer12 | | | | | |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -216,7 +209,7 @@ CREATE TABLE customer_copy_range (
|
||||||
primary key (c_custkey));
|
primary key (c_custkey));
|
||||||
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -238,57 +231,38 @@ COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITE
|
||||||
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
||||||
FROM customer_copy_range WHERE c_custkey <= 500;
|
FROM customer_copy_range WHERE c_custkey <= 500;
|
||||||
min | max | avg | count
|
min | max | avg | count
|
||||||
-----+-----+----------------------+-------
|
---------------------------------------------------------------------
|
||||||
1 | 500 | 250.5000000000000000 | 500
|
1 | 500 | 250.5000000000000000 | 500
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Check whether data was copied
|
-- Check whether data was copied
|
||||||
SELECT count(*) FROM customer_copy_range;
|
SELECT count(*) FROM customer_copy_range;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
1000
|
1000
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Manipulate min/max values and check shard statistics for new shard
|
|
||||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000
|
|
||||||
WHERE shardid = :new_shard_id;
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
||||||
shardid | shardminvalue | shardmaxvalue
|
|
||||||
---------+---------------+---------------
|
|
||||||
560129 | 1501 | 2000
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
||||||
shardid | shardlength
|
shardid | shardlength
|
||||||
---------+-------------
|
---------------------------------------------------------------------
|
||||||
560129 | 0
|
560129 | 0
|
||||||
560129 | 0
|
560129 | 0
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Update shard statistics for range-partitioned shard and check that only the
|
-- Update shard statistics for range-partitioned shard
|
||||||
-- shard length is updated.
|
SELECT citus_update_shard_statistics(:new_shard_id);
|
||||||
SELECT master_update_shard_statistics(:new_shard_id);
|
citus_update_shard_statistics
|
||||||
master_update_shard_statistics
|
---------------------------------------------------------------------
|
||||||
--------------------------------
|
|
||||||
131072
|
131072
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
||||||
shardid | shardminvalue | shardmaxvalue
|
|
||||||
---------+---------------+---------------
|
|
||||||
560129 | 1501 | 2000
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
||||||
shardid | shardlength
|
shardid | shardlength
|
||||||
---------+-------------
|
---------------------------------------------------------------------
|
||||||
560129 | 131072
|
560129 | 131072
|
||||||
560129 | 131072
|
560129 | 131072
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Revert back min/max value updates
|
|
||||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
|
||||||
WHERE shardid = :new_shard_id;
|
|
||||||
-- Create a new append-partitioned table into which to COPY
|
-- Create a new append-partitioned table into which to COPY
|
||||||
CREATE TABLE customer_copy_append (
|
CREATE TABLE customer_copy_append (
|
||||||
c_custkey integer,
|
c_custkey integer,
|
||||||
|
@ -301,7 +275,7 @@ CREATE TABLE customer_copy_append (
|
||||||
c_comment varchar(117));
|
c_comment varchar(117));
|
||||||
SELECT create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
SELECT create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -309,13 +283,13 @@ SELECT create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_create_empty_shard('customer_copy_append') AS shardid \gset
|
SELECT master_create_empty_shard('customer_copy_append') AS shardid \gset
|
||||||
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid);
|
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid);
|
||||||
ERROR: invalid input syntax for type integer: "notinteger"
|
ERROR: invalid input syntax for integer: "notinteger"
|
||||||
CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
|
CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
|
||||||
END;
|
END;
|
||||||
-- Test that no shard is created for failing copy
|
-- Test that no shard is created for failing copy
|
||||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -327,7 +301,7 @@ END;
|
||||||
-- Test that a shard is created
|
-- Test that a shard is created
|
||||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -339,43 +313,30 @@ END;
|
||||||
-- Check whether data was copied properly
|
-- Check whether data was copied properly
|
||||||
SELECT * FROM customer_copy_append;
|
SELECT * FROM customer_copy_append;
|
||||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
||||||
-----------+-----------+-----------+-------------+---------+-----------+--------------+-----------
|
---------------------------------------------------------------------
|
||||||
1 | customer1 | | | | | |
|
1 | customer1 | | | | | |
|
||||||
2 | customer2 | | | | | |
|
2 | customer2 | | | | | |
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
||||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560132;
|
|
||||||
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132;
|
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132;
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
|
|
||||||
shardid | shardminvalue | shardmaxvalue
|
|
||||||
---------+---------------+---------------
|
|
||||||
560132 | 1501 | 2000
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
||||||
shardid | shardlength
|
shardid | shardlength
|
||||||
---------+-------------
|
---------------------------------------------------------------------
|
||||||
560132 | 0
|
560132 | 0
|
||||||
560132 | 0
|
560132 | 0
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Update shard statistics for append-partitioned shard
|
-- Update shard statistics for append-partitioned shard
|
||||||
SELECT master_update_shard_statistics(560132);
|
SELECT citus_update_shard_statistics(560132);
|
||||||
master_update_shard_statistics
|
citus_update_shard_statistics
|
||||||
--------------------------------
|
---------------------------------------------------------------------
|
||||||
8192
|
8192
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132;
|
|
||||||
shardid | shardminvalue | shardmaxvalue
|
|
||||||
---------+---------------+---------------
|
|
||||||
560132 | 1 | 2
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132;
|
||||||
shardid | shardlength
|
shardid | shardlength
|
||||||
---------+-------------
|
---------------------------------------------------------------------
|
||||||
560132 | 8192
|
560132 | 8192
|
||||||
560132 | 8192
|
560132 | 8192
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -400,7 +361,7 @@ CREATE TABLE lineitem_copy_append (
|
||||||
l_comment varchar(44) not null);
|
l_comment varchar(44) not null);
|
||||||
SELECT create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
|
SELECT create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -410,15 +371,15 @@ COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimit
|
||||||
END;
|
END;
|
||||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- trigger some errors on the append_to_shard option
|
-- trigger some errors on the append_to_shard option
|
||||||
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard 1);
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard xxxxx);
|
||||||
ERROR: could not find valid entry for shard 1
|
ERROR: could not find valid entry for shard xxxxx
|
||||||
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard 560000);
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard xxxxx);
|
||||||
ERROR: shard 560000 does not belong to table lineitem_copy_append
|
ERROR: shard xxxxx does not belong to table lineitem_copy_append
|
||||||
-- Test schema support on append partitioned tables
|
-- Test schema support on append partitioned tables
|
||||||
CREATE SCHEMA append;
|
CREATE SCHEMA append;
|
||||||
CREATE TABLE append.customer_copy (
|
CREATE TABLE append.customer_copy (
|
||||||
|
@ -432,7 +393,7 @@ CREATE TABLE append.customer_copy (
|
||||||
c_comment varchar(117));
|
c_comment varchar(117));
|
||||||
SELECT create_distributed_table('append.customer_copy', 'c_custkey', 'append');
|
SELECT create_distributed_table('append.customer_copy', 'c_custkey', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -444,7 +405,7 @@ COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimit
|
||||||
-- Test the content of the table
|
-- Test the content of the table
|
||||||
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
|
||||||
min | max | avg | count
|
min | max | avg | count
|
||||||
-----+------+-----------------------+-------
|
---------------------------------------------------------------------
|
||||||
1 | 7000 | 4443.8028800000000000 | 2000
|
1 | 7000 | 4443.8028800000000000 | 2000
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -454,13 +415,13 @@ CREATE TABLE "customer_with_special_\\_character"(
|
||||||
c_name varchar(25) not null);
|
c_name varchar(25) not null);
|
||||||
SELECT master_create_distributed_table('"customer_with_special_\\_character"', 'c_custkey', 'hash');
|
SELECT master_create_distributed_table('"customer_with_special_\\_character"', 'c_custkey', 'hash');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_create_worker_shards('"customer_with_special_\\_character"', 4, 1);
|
SELECT master_create_worker_shards('"customer_with_special_\\_character"', 4, 1);
|
||||||
master_create_worker_shards
|
master_create_worker_shards
|
||||||
-----------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -469,7 +430,7 @@ WITH (FORMAT 'csv');
|
||||||
-- Confirm that data was copied
|
-- Confirm that data was copied
|
||||||
SELECT count(*) FROM "customer_with_special_\\_character";
|
SELECT count(*) FROM "customer_with_special_\\_character";
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
2
|
2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -479,13 +440,13 @@ CREATE TABLE "1_customer"(
|
||||||
c_name varchar(25) not null);
|
c_name varchar(25) not null);
|
||||||
SELECT master_create_distributed_table('"1_customer"', 'c_custkey', 'hash');
|
SELECT master_create_distributed_table('"1_customer"', 'c_custkey', 'hash');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_create_worker_shards('"1_customer"', 4, 1);
|
SELECT master_create_worker_shards('"1_customer"', 4, 1);
|
||||||
master_create_worker_shards
|
master_create_worker_shards
|
||||||
-----------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -494,7 +455,7 @@ WITH (FORMAT 'csv');
|
||||||
-- Confirm that data was copied
|
-- Confirm that data was copied
|
||||||
SELECT count(*) FROM "1_customer";
|
SELECT count(*) FROM "1_customer";
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
2
|
2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -514,13 +475,13 @@ CREATE TABLE packed_numbers_hash (
|
||||||
);
|
);
|
||||||
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
|
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
|
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
|
||||||
master_create_worker_shards
|
master_create_worker_shards
|
||||||
-----------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -529,7 +490,7 @@ COPY packed_numbers_hash FROM :'temp_dir''copy_test_array_of_composite';
|
||||||
-- Verify data is actually copied
|
-- Verify data is actually copied
|
||||||
SELECT * FROM packed_numbers_hash;
|
SELECT * FROM packed_numbers_hash;
|
||||||
id | packed_numbers
|
id | packed_numbers
|
||||||
----+-----------------------
|
---------------------------------------------------------------------
|
||||||
1 | {"(42,42)","(42,42)"}
|
1 | {"(42,42)","(42,42)"}
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -540,13 +501,13 @@ CREATE TABLE super_packed_numbers_hash (
|
||||||
);
|
);
|
||||||
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
|
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
|
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
|
||||||
master_create_worker_shards
|
master_create_worker_shards
|
||||||
-----------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -555,7 +516,7 @@ COPY super_packed_numbers_hash FROM :'temp_dir''copy_test_composite_of_composite
|
||||||
-- Verify data is actually copied
|
-- Verify data is actually copied
|
||||||
SELECT * FROM super_packed_numbers_hash;
|
SELECT * FROM super_packed_numbers_hash;
|
||||||
id | super_packed_number
|
id | super_packed_number
|
||||||
----+-----------------------
|
---------------------------------------------------------------------
|
||||||
1 | ("(42,42)","(42,42)")
|
1 | ("(42,42)","(42,42)")
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -566,7 +527,7 @@ CREATE TABLE packed_numbers_append (
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('packed_numbers_append', 'id', 'append');
|
SELECT create_distributed_table('packed_numbers_append', 'id', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -575,7 +536,7 @@ COPY packed_numbers_append FROM :'temp_dir''copy_test_array_of_composite' WITH (
|
||||||
-- Verify data is actually copied
|
-- Verify data is actually copied
|
||||||
SELECT * FROM packed_numbers_append;
|
SELECT * FROM packed_numbers_append;
|
||||||
id | packed_numbers
|
id | packed_numbers
|
||||||
----+-----------------------
|
---------------------------------------------------------------------
|
||||||
1 | {"(42,42)","(42,42)"}
|
1 | {"(42,42)","(42,42)"}
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -586,7 +547,7 @@ CREATE TABLE super_packed_numbers_append (
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('super_packed_numbers_append', 'id', 'append');
|
SELECT create_distributed_table('super_packed_numbers_append', 'id', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -595,7 +556,7 @@ COPY super_packed_numbers_append FROM :'temp_dir''copy_test_composite_of_composi
|
||||||
-- Verify data is actually copied
|
-- Verify data is actually copied
|
||||||
SELECT * FROM super_packed_numbers_append;
|
SELECT * FROM super_packed_numbers_append;
|
||||||
id | super_packed_number
|
id | super_packed_number
|
||||||
----+-----------------------
|
---------------------------------------------------------------------
|
||||||
1 | ("(42,42)","(42,42)")
|
1 | ("(42,42)","(42,42)")
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -606,7 +567,7 @@ CREATE TABLE composite_partition_column_table(
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('composite_partition_column_table', 'composite_column', 'append');
|
SELECT create_distributed_table('composite_partition_column_table', 'composite_column', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -616,7 +577,7 @@ COPY composite_partition_column_table FROM STDIN WITH (FORMAT 'csv', append_to_s
|
||||||
CREATE TABLE numbers_append (a int, b int);
|
CREATE TABLE numbers_append (a int, b int);
|
||||||
SELECT create_distributed_table('numbers_append', 'a', 'append');
|
SELECT create_distributed_table('numbers_append', 'a', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -625,7 +586,7 @@ SELECT shardid, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
||||||
shardid | nodename | nodeport
|
shardid | nodename | nodeport
|
||||||
---------+----------+----------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset
|
SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset
|
||||||
|
@ -637,7 +598,7 @@ SELECT shardid, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
||||||
shardid | nodename | nodeport
|
shardid | nodename | nodeport
|
||||||
---------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560155 | localhost | 57637
|
560155 | localhost | 57637
|
||||||
560155 | localhost | 57638
|
560155 | localhost | 57638
|
||||||
560156 | localhost | 57638
|
560156 | localhost | 57638
|
||||||
|
@ -646,9 +607,9 @@ SELECT shardid, nodename, nodeport
|
||||||
|
|
||||||
-- disable the first node
|
-- disable the first node
|
||||||
SELECT master_disable_node('localhost', :worker_1_port);
|
SELECT master_disable_node('localhost', :worker_1_port);
|
||||||
NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57637) to activate this node back.
|
NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57637) to activate this node back.
|
||||||
master_disable_node
|
master_disable_node
|
||||||
---------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -664,7 +625,7 @@ SELECT shardid, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
||||||
shardid | nodename | nodeport
|
shardid | nodename | nodeport
|
||||||
---------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560155 | localhost | 57637
|
560155 | localhost | 57637
|
||||||
560155 | localhost | 57638
|
560155 | localhost | 57638
|
||||||
560156 | localhost | 57638
|
560156 | localhost | 57638
|
||||||
|
@ -677,7 +638,7 @@ SELECT shardid, nodename, nodeport
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
|
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
|
||||||
?column?
|
?column?
|
||||||
----------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -692,7 +653,7 @@ SELECT shardid, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
||||||
shardid | nodename | nodeport
|
shardid | nodename | nodeport
|
||||||
---------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560155 | localhost | 57637
|
560155 | localhost | 57637
|
||||||
560155 | localhost | 57638
|
560155 | localhost | 57638
|
||||||
560156 | localhost | 57638
|
560156 | localhost | 57638
|
||||||
|
@ -713,7 +674,7 @@ NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
||||||
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
||||||
SELECT * FROM run_command_on_workers('CREATE USER test_user');
|
SELECT * FROM run_command_on_workers('CREATE USER test_user');
|
||||||
nodename | nodeport | success | result
|
nodename | nodeport | success | result
|
||||||
-----------+----------+---------+-------------
|
---------------------------------------------------------------------
|
||||||
localhost | 57637 | t | CREATE ROLE
|
localhost | 57637 | t | CREATE ROLE
|
||||||
localhost | 57638 | t | CREATE ROLE
|
localhost | 57638 | t | CREATE ROLE
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -723,7 +684,7 @@ SET citus.shard_count to 4;
|
||||||
CREATE TABLE numbers_hash (a int, b int);
|
CREATE TABLE numbers_hash (a int, b int);
|
||||||
SELECT create_distributed_table('numbers_hash', 'a');
|
SELECT create_distributed_table('numbers_hash', 'a');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -733,7 +694,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
||||||
shardid | shardstate | nodename | nodeport
|
shardid | shardstate | nodename | nodeport
|
||||||
---------+------------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560161 | 1 | localhost | 57637
|
560161 | 1 | localhost | 57637
|
||||||
560161 | 1 | localhost | 57638
|
560161 | 1 | localhost | 57638
|
||||||
560162 | 1 | localhost | 57637
|
560162 | 1 | localhost | 57637
|
||||||
|
@ -748,7 +709,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
CREATE TABLE numbers_reference(a int, b int);
|
CREATE TABLE numbers_reference(a int, b int);
|
||||||
SELECT create_reference_table('numbers_reference');
|
SELECT create_reference_table('numbers_reference');
|
||||||
create_reference_table
|
create_reference_table
|
||||||
------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -757,7 +718,7 @@ COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
|
||||||
CREATE TABLE numbers_hash_other(a int, b int);
|
CREATE TABLE numbers_hash_other(a int, b int);
|
||||||
SELECT create_distributed_table('numbers_hash_other', 'a');
|
SELECT create_distributed_table('numbers_hash_other', 'a');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -765,7 +726,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
|
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
|
||||||
shardid | shardstate | nodename | nodeport
|
shardid | shardstate | nodename | nodeport
|
||||||
---------+------------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560166 | 1 | localhost | 57637
|
560166 | 1 | localhost | 57637
|
||||||
560166 | 1 | localhost | 57638
|
560166 | 1 | localhost | 57638
|
||||||
560167 | 1 | localhost | 57637
|
560167 | 1 | localhost | 57637
|
||||||
|
@ -790,14 +751,14 @@ ALTER USER test_user WITH nologin;
|
||||||
\c - test_user - :master_port
|
\c - test_user - :master_port
|
||||||
-- reissue copy, and it should fail
|
-- reissue copy, and it should fail
|
||||||
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
||||||
ERROR: connection to the remote node localhost:57637 failed with the following error: FATAL: role "test_user" is not permitted to log in
|
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
|
||||||
CONTEXT: COPY numbers_hash, line 1: "1,1"
|
CONTEXT: COPY numbers_hash, line 1: "1,1"
|
||||||
-- verify shards in the none of the workers as marked invalid
|
-- verify shards in the none of the workers as marked invalid
|
||||||
SELECT shardid, shardstate, nodename, nodeport
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
||||||
shardid | shardstate | nodename | nodeport
|
shardid | shardstate | nodename | nodeport
|
||||||
---------+------------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560161 | 1 | localhost | 57637
|
560161 | 1 | localhost | 57637
|
||||||
560161 | 1 | localhost | 57638
|
560161 | 1 | localhost | 57638
|
||||||
560162 | 1 | localhost | 57637
|
560162 | 1 | localhost | 57637
|
||||||
|
@ -810,14 +771,14 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
|
||||||
-- try to insert into a reference table copy should fail
|
-- try to insert into a reference table copy should fail
|
||||||
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
|
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
|
||||||
ERROR: connection to the remote node localhost:57637 failed with the following error: FATAL: role "test_user" is not permitted to log in
|
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
|
||||||
CONTEXT: COPY numbers_reference, line 1: "3,1"
|
CONTEXT: COPY numbers_reference, line 1: "3,1"
|
||||||
-- verify shards for reference table are still valid
|
-- verify shards for reference table are still valid
|
||||||
SELECT shardid, shardstate, nodename, nodeport
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_reference'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_reference'::regclass order by placementid;
|
||||||
shardid | shardstate | nodename | nodeport
|
shardid | shardstate | nodename | nodeport
|
||||||
---------+------------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560165 | 1 | localhost | 57637
|
560165 | 1 | localhost | 57637
|
||||||
560165 | 1 | localhost | 57638
|
560165 | 1 | localhost | 57638
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -826,7 +787,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
-- since it can not insert into either copies of a shard. shards are expected to
|
-- since it can not insert into either copies of a shard. shards are expected to
|
||||||
-- stay valid since the operation is rolled back.
|
-- stay valid since the operation is rolled back.
|
||||||
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
|
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
|
||||||
ERROR: connection to the remote node localhost:57637 failed with the following error: FATAL: role "test_user" is not permitted to log in
|
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
|
||||||
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
||||||
-- verify shards for numbers_hash_other are still valid
|
-- verify shards for numbers_hash_other are still valid
|
||||||
-- since copy has failed altogether
|
-- since copy has failed altogether
|
||||||
|
@ -834,7 +795,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
|
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
|
||||||
shardid | shardstate | nodename | nodeport
|
shardid | shardstate | nodename | nodeport
|
||||||
---------+------------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560166 | 1 | localhost | 57637
|
560166 | 1 | localhost | 57637
|
||||||
560166 | 1 | localhost | 57638
|
560166 | 1 | localhost | 57638
|
||||||
560167 | 1 | localhost | 57637
|
560167 | 1 | localhost | 57637
|
||||||
|
@ -859,7 +820,7 @@ SET citus.shard_count to 4;
|
||||||
CREATE TABLE numbers_hash(a int, b int);
|
CREATE TABLE numbers_hash(a int, b int);
|
||||||
SELECT create_distributed_table('numbers_hash', 'a');
|
SELECT create_distributed_table('numbers_hash', 'a');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -869,12 +830,12 @@ ALTER TABLE numbers_hash_560170 DROP COLUMN b;
|
||||||
-- operation will fail to modify a shard and roll back
|
-- operation will fail to modify a shard and roll back
|
||||||
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
||||||
ERROR: column "b" of relation "numbers_hash_560170" does not exist
|
ERROR: column "b" of relation "numbers_hash_560170" does not exist
|
||||||
CONTEXT: while executing command on localhost:57637
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
COPY numbers_hash, line 1: "1,1"
|
COPY numbers_hash, line 1: "1,1"
|
||||||
-- verify no row is inserted
|
-- verify no row is inserted
|
||||||
SELECT count(a) FROM numbers_hash;
|
SELECT count(a) FROM numbers_hash;
|
||||||
count
|
count
|
||||||
-------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -883,7 +844,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
||||||
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
||||||
shardid | shardstate | nodename | nodeport
|
shardid | shardstate | nodename | nodeport
|
||||||
---------+------------+-----------+----------
|
---------------------------------------------------------------------
|
||||||
560170 | 1 | localhost | 57637
|
560170 | 1 | localhost | 57637
|
||||||
560170 | 1 | localhost | 57638
|
560170 | 1 | localhost | 57638
|
||||||
560171 | 1 | localhost | 57637
|
560171 | 1 | localhost | 57637
|
||||||
|
@ -897,7 +858,7 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
DROP TABLE numbers_hash;
|
DROP TABLE numbers_hash;
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
nodename | nodeport | success | result
|
nodename | nodeport | success | result
|
||||||
-----------+----------+---------+-----------
|
---------------------------------------------------------------------
|
||||||
localhost | 57637 | t | DROP ROLE
|
localhost | 57637 | t | DROP ROLE
|
||||||
localhost | 57638 | t | DROP ROLE
|
localhost | 57638 | t | DROP ROLE
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -910,14 +871,14 @@ col2 character varying(255) NOT NULL
|
||||||
);
|
);
|
||||||
SELECT create_reference_table('test_binaryless_builtin');
|
SELECT create_reference_table('test_binaryless_builtin');
|
||||||
create_reference_table
|
create_reference_table
|
||||||
------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\COPY test_binaryless_builtin FROM STDIN WITH (format CSV)
|
\COPY test_binaryless_builtin FROM STDIN WITH (format CSV)
|
||||||
SELECT * FROM test_binaryless_builtin;
|
SELECT * FROM test_binaryless_builtin;
|
||||||
col1 | col2
|
col1 | col2
|
||||||
---------------------+-------
|
---------------------------------------------------------------------
|
||||||
postgres=r/postgres | test
|
postgres=r/postgres | test
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -927,7 +888,7 @@ BEGIN;
|
||||||
CREATE TABLE tt1(id int);
|
CREATE TABLE tt1(id int);
|
||||||
SELECT create_distributed_table('tt1','id');
|
SELECT create_distributed_table('tt1','id');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -938,7 +899,7 @@ END;
|
||||||
CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int);
|
CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int);
|
||||||
SELECT create_distributed_table('drop_copy_test_table','col3');
|
SELECT create_distributed_table('drop_copy_test_table','col3');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -946,7 +907,7 @@ ALTER TABLE drop_copy_test_table drop column col1;
|
||||||
COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV;
|
COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV;
|
||||||
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
||||||
col2 | col3 | col4
|
col2 | col3 | col4
|
||||||
------+------+------
|
---------------------------------------------------------------------
|
||||||
| 1 |
|
| 1 |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -954,7 +915,7 @@ ALTER TABLE drop_copy_test_table drop column col4;
|
||||||
COPY drop_copy_test_table (col2,col3) from STDIN with CSV;
|
COPY drop_copy_test_table (col2,col3) from STDIN with CSV;
|
||||||
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
||||||
col2 | col3
|
col2 | col3
|
||||||
------+------
|
---------------------------------------------------------------------
|
||||||
| 1
|
| 1
|
||||||
| 1
|
| 1
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -964,7 +925,7 @@ DROP TABLE drop_copy_test_table;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
||||||
relname
|
relname
|
||||||
---------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
@ -978,7 +939,7 @@ NOTICE: copying the data has completed
|
||||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.trigger_flush$$)
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.trigger_flush$$)
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -990,7 +951,7 @@ SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int);
|
CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int);
|
||||||
SELECT create_distributed_table('trigger_switchover','a');
|
SELECT create_distributed_table('trigger_switchover','a');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -1001,7 +962,7 @@ ABORT;
|
||||||
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);
|
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);
|
||||||
SELECT create_distributed_table('copy_jsonb', 'key', colocate_with => 'none');
|
SELECT create_distributed_table('copy_jsonb', 'key', colocate_with => 'none');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
@ -1009,7 +970,7 @@ SELECT create_distributed_table('copy_jsonb', 'key', colocate_with => 'none');
|
||||||
\COPY copy_jsonb (key, value) FROM STDIN
|
\COPY copy_jsonb (key, value) FROM STDIN
|
||||||
SELECT * FROM copy_jsonb ORDER BY key;
|
SELECT * FROM copy_jsonb ORDER BY key;
|
||||||
key | value | extra
|
key | value | extra
|
||||||
-------+----------------------------+-------------
|
---------------------------------------------------------------------
|
||||||
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
||||||
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -1019,7 +980,7 @@ COPY copy_jsonb TO :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
||||||
COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
||||||
SELECT * FROM copy_jsonb ORDER BY key;
|
SELECT * FROM copy_jsonb ORDER BY key;
|
||||||
key | value | extra
|
key | value | extra
|
||||||
-------+----------------------------+-------------
|
---------------------------------------------------------------------
|
||||||
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
||||||
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
||||||
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
||||||
|
@ -1028,7 +989,7 @@ SELECT * FROM copy_jsonb ORDER BY key;
|
||||||
|
|
||||||
-- JSONB parsing error without validation: no line number
|
-- JSONB parsing error without validation: no line number
|
||||||
\COPY copy_jsonb (key, value) FROM STDIN
|
\COPY copy_jsonb (key, value) FROM STDIN
|
||||||
ERROR: invalid input syntax for type json
|
ERROR: invalid input syntax for json
|
||||||
DETAIL: The input string ended unexpectedly.
|
DETAIL: The input string ended unexpectedly.
|
||||||
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
|
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
|
||||||
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
|
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
|
||||||
|
@ -1038,7 +999,7 @@ SET citus.skip_jsonb_validation_in_copy TO off;
|
||||||
\COPY copy_jsonb (key, value) FROM STDIN
|
\COPY copy_jsonb (key, value) FROM STDIN
|
||||||
SELECT * FROM copy_jsonb ORDER BY key;
|
SELECT * FROM copy_jsonb ORDER BY key;
|
||||||
key | value | extra
|
key | value | extra
|
||||||
-------+----------------------------+-------------
|
---------------------------------------------------------------------
|
||||||
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
||||||
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
@ -1048,7 +1009,7 @@ COPY copy_jsonb TO :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
||||||
COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
||||||
SELECT * FROM copy_jsonb ORDER BY key;
|
SELECT * FROM copy_jsonb ORDER BY key;
|
||||||
key | value | extra
|
key | value | extra
|
||||||
-------+----------------------------+-------------
|
---------------------------------------------------------------------
|
||||||
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
||||||
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
||||||
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
||||||
|
@ -1057,7 +1018,7 @@ SELECT * FROM copy_jsonb ORDER BY key;
|
||||||
|
|
||||||
-- JSONB parsing error with validation: should see line number
|
-- JSONB parsing error with validation: should see line number
|
||||||
\COPY copy_jsonb (key, value) FROM STDIN
|
\COPY copy_jsonb (key, value) FROM STDIN
|
||||||
ERROR: invalid input syntax for type json
|
ERROR: invalid input syntax for json
|
||||||
DETAIL: The input string ended unexpectedly.
|
DETAIL: The input string ended unexpectedly.
|
||||||
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
|
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
|
||||||
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
|
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
|
||||||
|
|
|
@ -1,59 +0,0 @@
|
||||||
setup
|
|
||||||
{
|
|
||||||
CREATE TABLE dist_table(a INT, b INT);
|
|
||||||
SELECT create_distributed_table('dist_table', 'a');
|
|
||||||
}
|
|
||||||
|
|
||||||
teardown
|
|
||||||
{
|
|
||||||
DROP TABLE IF EXISTS dist_table;
|
|
||||||
}
|
|
||||||
|
|
||||||
session "s1"
|
|
||||||
|
|
||||||
step "s1-begin"
|
|
||||||
{
|
|
||||||
BEGIN;
|
|
||||||
}
|
|
||||||
|
|
||||||
step "s1-drop-table"
|
|
||||||
{
|
|
||||||
DROP TABLE dist_table;
|
|
||||||
}
|
|
||||||
|
|
||||||
step "s1-commit"
|
|
||||||
{
|
|
||||||
COMMIT;
|
|
||||||
}
|
|
||||||
|
|
||||||
session "s2"
|
|
||||||
|
|
||||||
step "s2-begin"
|
|
||||||
{
|
|
||||||
BEGIN;
|
|
||||||
}
|
|
||||||
|
|
||||||
step "s2-citus-update-table-statistics"
|
|
||||||
{
|
|
||||||
SET client_min_messages TO NOTICE;
|
|
||||||
SELECT citus_update_table_statistics(logicalrelid) FROM pg_dist_partition;
|
|
||||||
}
|
|
||||||
|
|
||||||
step "s2-citus-shards"
|
|
||||||
{
|
|
||||||
SELECT 1 AS result FROM citus_shards GROUP BY result;
|
|
||||||
}
|
|
||||||
|
|
||||||
step "s2-commit"
|
|
||||||
{
|
|
||||||
COMMIT;
|
|
||||||
}
|
|
||||||
|
|
||||||
permutation "s1-begin" "s1-drop-table" "s2-citus-update-table-statistics" "s1-commit"
|
|
||||||
permutation "s1-begin" "s1-drop-table" "s2-citus-shards" "s1-commit"
|
|
||||||
permutation "s2-begin" "s2-citus-shards" "s1-drop-table" "s2-commit"
|
|
||||||
|
|
||||||
// ERROR: tuple concurrently deleted -- is expected in the following permutation
|
|
||||||
// Check the explanation at PR #5155 in the following comment
|
|
||||||
// https://github.com/citusdata/citus/pull/5155#issuecomment-897028194
|
|
||||||
permutation "s2-begin" "s2-citus-update-table-statistics" "s1-drop-table" "s2-commit"
|
|
Loading…
Reference in New Issue