Harden citus_tables against node failure

pull/4384/head
Marco Slot 2020-12-03 22:27:20 +01:00
parent e24e7985f7
commit 8e8adcd92a
12 changed files with 152 additions and 53 deletions

View File

@ -71,9 +71,11 @@ static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId);
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static uint64 DistributedTableSize(Oid relationId, char *sizeQuery);
static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery);
static bool DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError,
uint64 *tableSize);
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery, bool failOnError,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static void ErrorIfNotSuitableToGetSize(Oid relationId);
static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
@ -93,6 +95,8 @@ Datum
citus_total_relation_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
bool failOnError = PG_GETARG_BOOL(1);
char *tableSizeFunction = PG_TOTAL_RELATION_SIZE_FUNCTION;
CheckCitusVersion(ERROR);
@ -102,9 +106,15 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
}
uint64 totalRelationSize = DistributedTableSize(relationId, tableSizeFunction);
uint64 tableSize = 0;
PG_RETURN_INT64(totalRelationSize);
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}
PG_RETURN_INT64(tableSize);
}
@ -116,6 +126,7 @@ Datum
citus_table_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
char *tableSizeFunction = PG_TABLE_SIZE_FUNCTION;
CheckCitusVersion(ERROR);
@ -125,7 +136,13 @@ citus_table_size(PG_FUNCTION_ARGS)
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
}
uint64 tableSize = DistributedTableSize(relationId, tableSizeFunction);
uint64 tableSize = 0;
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}
PG_RETURN_INT64(tableSize);
}
@ -139,6 +156,7 @@ Datum
citus_relation_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
char *tableSizeFunction = PG_RELATION_SIZE_FUNCTION;
CheckCitusVersion(ERROR);
@ -148,7 +166,13 @@ citus_relation_size(PG_FUNCTION_ARGS)
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
}
uint64 relationSize = DistributedTableSize(relationId, tableSizeFunction);
uint64 relationSize = 0;
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}
PG_RETURN_INT64(relationSize);
}
@ -159,40 +183,61 @@ citus_relation_size(PG_FUNCTION_ARGS)
* It first checks whether the table is distributed and size query can be run on
* it. Connection to each node has to be established to get the size of the table.
*/
static uint64
DistributedTableSize(Oid relationId, char *sizeQuery)
static bool
DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 *tableSize)
{
uint64 totalRelationSize = 0;
int logLevel = WARNING;
if (failOnError)
{
logLevel = ERROR;
}
uint64 sumOfSizes = 0;
if (XactModificationLevel == XACT_MODIFICATION_DATA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("citus size functions cannot be called in transaction"
" blocks which contain multi-shard data modifications")));
ereport(logLevel, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("citus size functions cannot be called in transaction "
"blocks which contain multi-shard data "
"modifications")));
return false;
}
Relation relation = try_relation_open(relationId, AccessShareLock);
if (relation == NULL)
{
ereport(ERROR,
ereport(logLevel,
(errmsg("could not compute table size: relation does not exist")));
return false;
}
ErrorIfNotSuitableToGetSize(relationId);
table_close(relation, AccessShareLock);
List *workerNodeList = ActiveReadableNodeList();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
uint64 relationSizeOnNode = DistributedTableSizeOnWorker(workerNode, relationId,
sizeQuery);
totalRelationSize += relationSizeOnNode;
uint64 relationSizeOnNode = 0;
bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQuery,
failOnError, &relationSizeOnNode);
if (!gotSize)
{
return false;
}
sumOfSizes += relationSizeOnNode;
}
table_close(relation, AccessShareLock);
*tableSize = sumOfSizes;
return totalRelationSize;
return true;
}
@ -201,14 +246,21 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
* size of that relation on the given workerNode by summing up the size of each
* shard placement.
*/
static uint64
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery)
static bool
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery,
bool failOnError, uint64 *tableSize)
{
int logLevel = WARNING;
if (failOnError)
{
logLevel = ERROR;
}
char *workerNodeName = workerNode->workerName;
uint32 workerNodePort = workerNode->workerPort;
uint32 connectionFlag = 0;
PGresult *result = NULL;
bool raiseErrors = true;
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
@ -223,19 +275,38 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
if (queryResult != 0)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the size because of a connection error")));
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get size of "
"table \"%s\"",
workerNodeName, workerNodePort,
get_rel_name(relationId))));
return false;
}
List *sizeList = ReadFirstColumnAsText(result);
if (list_length(sizeList) != 1)
{
PQclear(result);
ClearResults(connection, failOnError);
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse size of table \"%s\" from %s:%d",
get_rel_name(relationId), workerNodeName,
workerNodePort)));
return false;
}
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
char *tableSizeString = tableSizeStringInfo->data;
uint64 tableSize = SafeStringToUint64(tableSizeString);
*tableSize = SafeStringToUint64(tableSizeString);
PQclear(result);
ClearResults(connection, raiseErrors);
ClearResults(connection, failOnError);
return tableSize;
return true;
}

View File

@ -1,5 +1,8 @@
-- citus--9.5-1--10.0-1
DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
#include "udfs/citus_total_relation_size/10.0-1.sql"
#include "udfs/citus_tables/10.0-1.sql"
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"

View File

@ -6,3 +6,6 @@
#include "../../../columnar/sql/downgrades/columnar--10.0-1--9.5-1.sql"
DROP VIEW public.citus_tables;
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
#include "../udfs/citus_total_relation_size/7.0-1.sql"

View File

@ -4,7 +4,7 @@ SELECT
CASE WHEN partkey IS NOT NULL THEN 'distributed' ELSE 'reference' END AS "Citus Table Type",
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS "Distribution Column",
colocationid AS "Colocation ID",
pg_size_pretty(citus_total_relation_size(logicalrelid)) AS "Size",
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS "Size",
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS "Shard Count",
pg_get_userbyid(relowner) AS "Owner",
amname AS "Access Method"

View File

@ -4,7 +4,7 @@ SELECT
CASE WHEN partkey IS NOT NULL THEN 'distributed' ELSE 'reference' END AS "Citus Table Type",
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS "Distribution Column",
colocationid AS "Colocation ID",
pg_size_pretty(citus_total_relation_size(logicalrelid)) AS "Size",
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS "Size",
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS "Shard Count",
pg_get_userbyid(relowner) AS "Owner",
amname AS "Access Method"

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, fail_on_error boolean default true)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
COMMENT ON FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, boolean)
IS 'get total disk space used by the specified table';

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
COMMENT ON FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass)
IS 'get total disk space used by the specified table';

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, fail_on_error boolean default true)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
COMMENT ON FUNCTION pg_catalog.citus_total_relation_size(logicalrelid regclass, boolean)
IS 'get total disk space used by the specified table';

View File

@ -476,20 +476,22 @@ SELECT * FROM print_extension_changes();
-- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1';
SELECT * FROM print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
| access method columnar
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
| function citus_internal.columnar_ensure_objects_exist()
| function columnar.columnar_handler(internal)
| schema columnar
| sequence columnar.storageid_seq
| table columnar.columnar_skipnodes
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(11 rows)
function citus_total_relation_size(regclass) |
| access method columnar
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
| function citus_internal.columnar_ensure_objects_exist()
| function citus_total_relation_size(regclass,boolean)
| function columnar.columnar_handler(internal)
| schema columnar
| sequence columnar.storageid_seq
| table columnar.columnar_skipnodes
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(13 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -476,16 +476,18 @@ SELECT * FROM print_extension_changes();
-- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1';
SELECT * FROM print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
| function citus_internal.columnar_ensure_objects_exist()
| schema columnar
| sequence columnar.storageid_seq
| table columnar.columnar_skipnodes
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(7 rows)
function citus_total_relation_size(regclass) |
| function citus_internal.columnar_ensure_objects_exist()
| function citus_total_relation_size(regclass,boolean)
| schema columnar
| sequence columnar.storageid_seq
| table columnar.columnar_skipnodes
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(9 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -65,7 +65,7 @@ ORDER BY 1;
function citus_table_is_visible(oid)
function citus_table_size(regclass)
function citus_text_send_as_jsonb(text)
function citus_total_relation_size(regclass)
function citus_total_relation_size(regclass,boolean)
function citus_truncate_trigger()
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
function citus_version()

View File

@ -62,7 +62,7 @@ ORDER BY 1;
function citus_table_is_visible(oid)
function citus_table_size(regclass)
function citus_text_send_as_jsonb(text)
function citus_total_relation_size(regclass)
function citus_total_relation_size(regclass,boolean)
function citus_truncate_trigger()
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
function citus_version()