mirror of https://github.com/citusdata/citus.git
Merge pull request #4384 from citusdata/marcocitus/fix/citus-tables-failure
Harden citus_tables against node failurepull/4360/head
commit
4985bda3bd
|
@ -71,9 +71,11 @@ static uint64 * AllocateUint64(uint64 value);
|
|||
static void RecordDistributedRelationDependencies(Oid distributedRelationId);
|
||||
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
|
||||
HeapTuple heapTuple);
|
||||
static uint64 DistributedTableSize(Oid relationId, char *sizeQuery);
|
||||
static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||
char *sizeQuery);
|
||||
static bool DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError,
|
||||
uint64 *tableSize);
|
||||
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||
char *sizeQuery, bool failOnError,
|
||||
uint64 *tableSize);
|
||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
||||
static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
|
||||
|
@ -93,6 +95,8 @@ Datum
|
|||
citus_total_relation_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool failOnError = PG_GETARG_BOOL(1);
|
||||
|
||||
char *tableSizeFunction = PG_TOTAL_RELATION_SIZE_FUNCTION;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
@ -102,9 +106,15 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
|
|||
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
|
||||
}
|
||||
|
||||
uint64 totalRelationSize = DistributedTableSize(relationId, tableSizeFunction);
|
||||
uint64 tableSize = 0;
|
||||
|
||||
PG_RETURN_INT64(totalRelationSize);
|
||||
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize))
|
||||
{
|
||||
Assert(!failOnError);
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_RETURN_INT64(tableSize);
|
||||
}
|
||||
|
||||
|
||||
|
@ -116,6 +126,7 @@ Datum
|
|||
citus_table_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool failOnError = true;
|
||||
char *tableSizeFunction = PG_TABLE_SIZE_FUNCTION;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
@ -125,7 +136,13 @@ citus_table_size(PG_FUNCTION_ARGS)
|
|||
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
|
||||
}
|
||||
|
||||
uint64 tableSize = DistributedTableSize(relationId, tableSizeFunction);
|
||||
uint64 tableSize = 0;
|
||||
|
||||
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize))
|
||||
{
|
||||
Assert(!failOnError);
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_RETURN_INT64(tableSize);
|
||||
}
|
||||
|
@ -139,6 +156,7 @@ Datum
|
|||
citus_relation_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool failOnError = true;
|
||||
char *tableSizeFunction = PG_RELATION_SIZE_FUNCTION;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
@ -148,7 +166,13 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
|||
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
|
||||
}
|
||||
|
||||
uint64 relationSize = DistributedTableSize(relationId, tableSizeFunction);
|
||||
uint64 relationSize = 0;
|
||||
|
||||
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &relationSize))
|
||||
{
|
||||
Assert(!failOnError);
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_RETURN_INT64(relationSize);
|
||||
}
|
||||
|
@ -159,40 +183,61 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
|||
* It first checks whether the table is distributed and size query can be run on
|
||||
* it. Connection to each node has to be established to get the size of the table.
|
||||
*/
|
||||
static uint64
|
||||
DistributedTableSize(Oid relationId, char *sizeQuery)
|
||||
static bool
|
||||
DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 *tableSize)
|
||||
{
|
||||
uint64 totalRelationSize = 0;
|
||||
int logLevel = WARNING;
|
||||
|
||||
if (failOnError)
|
||||
{
|
||||
logLevel = ERROR;
|
||||
}
|
||||
|
||||
uint64 sumOfSizes = 0;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("citus size functions cannot be called in transaction"
|
||||
" blocks which contain multi-shard data modifications")));
|
||||
ereport(logLevel, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("citus size functions cannot be called in transaction "
|
||||
"blocks which contain multi-shard data "
|
||||
"modifications")));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
Relation relation = try_relation_open(relationId, AccessShareLock);
|
||||
|
||||
if (relation == NULL)
|
||||
{
|
||||
ereport(ERROR,
|
||||
ereport(logLevel,
|
||||
(errmsg("could not compute table size: relation does not exist")));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ErrorIfNotSuitableToGetSize(relationId);
|
||||
|
||||
table_close(relation, AccessShareLock);
|
||||
|
||||
List *workerNodeList = ActiveReadableNodeList();
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
uint64 relationSizeOnNode = DistributedTableSizeOnWorker(workerNode, relationId,
|
||||
sizeQuery);
|
||||
totalRelationSize += relationSizeOnNode;
|
||||
uint64 relationSizeOnNode = 0;
|
||||
|
||||
bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQuery,
|
||||
failOnError, &relationSizeOnNode);
|
||||
if (!gotSize)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
sumOfSizes += relationSizeOnNode;
|
||||
}
|
||||
|
||||
table_close(relation, AccessShareLock);
|
||||
*tableSize = sumOfSizes;
|
||||
|
||||
return totalRelationSize;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -201,14 +246,21 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
|
|||
* size of that relation on the given workerNode by summing up the size of each
|
||||
* shard placement.
|
||||
*/
|
||||
static uint64
|
||||
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery)
|
||||
static bool
|
||||
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery,
|
||||
bool failOnError, uint64 *tableSize)
|
||||
{
|
||||
int logLevel = WARNING;
|
||||
|
||||
if (failOnError)
|
||||
{
|
||||
logLevel = ERROR;
|
||||
}
|
||||
|
||||
char *workerNodeName = workerNode->workerName;
|
||||
uint32 workerNodePort = workerNode->workerPort;
|
||||
uint32 connectionFlag = 0;
|
||||
PGresult *result = NULL;
|
||||
bool raiseErrors = true;
|
||||
|
||||
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
|
||||
|
||||
|
@ -223,19 +275,38 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
|
|||
|
||||
if (queryResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("cannot get the size because of a connection error")));
|
||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not connect to %s:%d to get size of "
|
||||
"table \"%s\"",
|
||||
workerNodeName, workerNodePort,
|
||||
get_rel_name(relationId))));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
List *sizeList = ReadFirstColumnAsText(result);
|
||||
if (list_length(sizeList) != 1)
|
||||
{
|
||||
PQclear(result);
|
||||
ClearResults(connection, failOnError);
|
||||
|
||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("cannot parse size of table \"%s\" from %s:%d",
|
||||
get_rel_name(relationId), workerNodeName,
|
||||
workerNodePort)));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
|
||||
char *tableSizeString = tableSizeStringInfo->data;
|
||||
uint64 tableSize = SafeStringToUint64(tableSizeString);
|
||||
|
||||
*tableSize = SafeStringToUint64(tableSizeString);
|
||||
|
||||
PQclear(result);
|
||||
ClearResults(connection, raiseErrors);
|
||||
ClearResults(connection, failOnError);
|
||||
|
||||
return tableSize;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
-- citus--9.5-1--10.0-1
|
||||
|
||||
DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
|
||||
|
||||
#include "udfs/citus_total_relation_size/10.0-1.sql"
|
||||
#include "udfs/citus_tables/10.0-1.sql"
|
||||
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
|
||||
|
||||
|
|
|
@ -6,3 +6,6 @@
|
|||
#include "../../../columnar/sql/downgrades/columnar--10.0-1--9.5-1.sql"
|
||||
|
||||
DROP VIEW public.citus_tables;
|
||||
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
|
||||
|
||||
#include "../udfs/citus_total_relation_size/7.0-1.sql"
|
||||
|
|
|
@ -4,7 +4,7 @@ SELECT
|
|||
CASE WHEN partkey IS NOT NULL THEN 'distributed' ELSE 'reference' END AS "Citus Table Type",
|
||||
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS "Distribution Column",
|
||||
colocationid AS "Colocation ID",
|
||||
pg_size_pretty(citus_total_relation_size(logicalrelid)) AS "Size",
|
||||
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS "Size",
|
||||
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS "Shard Count",
|
||||
pg_get_userbyid(relowner) AS "Owner",
|
||||
amname AS "Access Method"
|
||||
|
|
|
@ -4,7 +4,7 @@ SELECT
|
|||
CASE WHEN partkey IS NOT NULL THEN 'distributed' ELSE 'reference' END AS "Citus Table Type",
|
||||
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS "Distribution Column",
|
||||
colocationid AS "Colocation ID",
|
||||
pg_size_pretty(citus_total_relation_size(logicalrelid)) AS "Size",
|
||||
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS "Size",
|
||||
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS "Shard Count",
|
||||
pg_get_userbyid(relowner) AS "Owner",
|
||||
amname AS "Access Method"
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, fail_on_error boolean default true)
|
||||
RETURNS bigint
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, boolean)
|
||||
IS 'get total disk space used by the specified table';
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass)
|
||||
RETURNS bigint
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass)
|
||||
IS 'get total disk space used by the specified table';
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, fail_on_error boolean default true)
|
||||
RETURNS bigint
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, boolean)
|
||||
IS 'get total disk space used by the specified table';
|
|
@ -476,20 +476,22 @@ SELECT * FROM print_extension_changes();
|
|||
-- Snapshot of state at 10.0-1
|
||||
ALTER EXTENSION citus UPDATE TO '10.0-1';
|
||||
SELECT * FROM print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
| access method columnar
|
||||
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
|
||||
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function columnar.columnar_handler(internal)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(11 rows)
|
||||
function citus_total_relation_size(regclass) |
|
||||
| access method columnar
|
||||
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
|
||||
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function columnar.columnar_handler(internal)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(13 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -476,16 +476,18 @@ SELECT * FROM print_extension_changes();
|
|||
-- Snapshot of state at 10.0-1
|
||||
ALTER EXTENSION citus UPDATE TO '10.0-1';
|
||||
SELECT * FROM print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(7 rows)
|
||||
function citus_total_relation_size(regclass) |
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(9 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -65,7 +65,7 @@ ORDER BY 1;
|
|||
function citus_table_is_visible(oid)
|
||||
function citus_table_size(regclass)
|
||||
function citus_text_send_as_jsonb(text)
|
||||
function citus_total_relation_size(regclass)
|
||||
function citus_total_relation_size(regclass,boolean)
|
||||
function citus_truncate_trigger()
|
||||
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
|
||||
function citus_version()
|
||||
|
|
|
@ -62,7 +62,7 @@ ORDER BY 1;
|
|||
function citus_table_is_visible(oid)
|
||||
function citus_table_size(regclass)
|
||||
function citus_text_send_as_jsonb(text)
|
||||
function citus_total_relation_size(regclass)
|
||||
function citus_total_relation_size(regclass,boolean)
|
||||
function citus_truncate_trigger()
|
||||
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
|
||||
function citus_version()
|
||||
|
|
Loading…
Reference in New Issue