Merge pull request #670 from citusdata/bugfix/empty_stage

Improve WorkerShardStats to avoid invalid value bugs
pull/696/head
Marco Slot 2016-07-29 20:17:08 +02:00 committed by GitHub
commit ae7bcfff45
6 changed files with 246 additions and 90 deletions

View File

@ -315,6 +315,26 @@ MultiClientConnectionUp(int32 connectionId)
}
/* MultiClientExecute synchronously executes a query over the given connection. */
bool
MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
int *rowCount, int *columnCount)
{
bool querySent = false;
bool queryOK = false;
querySent = MultiClientSendQuery(connectionId, query);
if (!querySent)
{
return false;
}
queryOK = MultiClientQueryResult(connectionId, queryResult, rowCount, columnCount);
return queryOK;
}
/* MultiClientSendQuery sends the given query over the given connection. */
bool
MultiClientSendQuery(int32 connectionId, const char *query)
@ -532,6 +552,15 @@ MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex)
}
/* MultiClientValueIsNull returns whether the value at the given position is null. */
bool
MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex)
{
bool isNull = PQgetisnull((PGresult *) queryResult, rowIndex, columnIndex);
return isNull;
}
/* MultiClientClearResult free's the memory associated with a PGresult. */
void
MultiClientClearResult(void *queryResult)

View File

@ -21,6 +21,7 @@
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "distributed/multi_client_executor.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
@ -45,11 +46,6 @@ static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue);
static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId,
char *tableName);
static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, char *selectQuery);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_empty_shard);
@ -314,8 +310,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
errdetail("Marking this shard placement as inactive")));
}
RESUME_INTERRUPTS();
/* update shard statistics and get new shard size */
newShardSize = UpdateShardStatistics(shardId);
@ -323,6 +317,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes);
RESUME_INTERRUPTS();
PG_RETURN_FLOAT4(shardFillLevel);
}
@ -585,109 +581,117 @@ static bool
WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName,
uint64 *shardSize, text **shardMinValue, text **shardMaxValue)
{
bool shardStatsOK = true;
PG_TRY();
{
uint64 tableSize = WorkerTableSize(nodeName, nodePort, relationId, shardName);
StringInfo minValue = WorkerPartitionValue(nodeName, nodePort, relationId,
shardName, SHARD_MIN_VALUE_QUERY);
StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId,
shardName, SHARD_MAX_VALUE_QUERY);
(*shardSize) = tableSize;
(*shardMinValue) = cstring_to_text_with_len(minValue->data, minValue->len);
(*shardMaxValue) = cstring_to_text_with_len(maxValue->data, maxValue->len);
}
PG_CATCH();
{
shardStatsOK = false;
}
PG_END_TRY();
return shardStatsOK;
}
/*
* WorkerTableSize queries the worker node to extract the disk space used by the
* given relation. The function assumes the relation represents a regular table or
* a cstore_fdw table.
*/
static uint64
WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName)
{
uint64 tableSize = 0;
List *queryResultList = NIL;
StringInfo tableSizeString = NULL;
char *tableSizeStringEnd = NULL;
char *quotedTableName = quote_literal_cstr(tableName);
bool cstoreTable = CStoreTable(relationId);
char *quotedShardName = NULL;
bool cstoreTable = false;
StringInfo tableSizeQuery = makeStringInfo();
const uint32 unusedTableId = 1;
char partitionType = PartitionMethod(relationId);
Var *partitionColumn = NULL;
char *partitionColumnName = NULL;
StringInfo partitionValueQuery = makeStringInfo();
int32 connectionId = -1;
bool queryOK = false;
void *queryResult = NULL;
int rowCount = 0;
int columnCount = 0;
const int minValueIndex = 0;
const int maxValueIndex = 1;
uint64 tableSize = 0;
char *tableSizeString = NULL;
char *tableSizeStringEnd = NULL;
bool minValueIsNull = false;
bool maxValueIsNull = false;
*shardSize = 0;
*shardMinValue = NULL;
*shardMaxValue = NULL;
connectionId = MultiClientConnect(nodeName, nodePort, NULL, NULL);
if (connectionId == INVALID_CONNECTION_ID)
{
return false;
}
quotedShardName = quote_literal_cstr(shardName);
cstoreTable = CStoreTable(relationId);
if (cstoreTable)
{
appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, quotedTableName);
appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, quotedShardName);
}
else
{
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedTableName);
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
}
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, NULL, tableSizeQuery);
if (queryResultList == NIL)
queryOK = MultiClientExecute(connectionId, tableSizeQuery->data,
&queryResult, &rowCount, &columnCount);
if (!queryOK)
{
ereport(ERROR, (errmsg("could not receive table size from node "
"\"%s:%u\"", nodeName, nodePort)));
MultiClientDisconnect(connectionId);
return false;
}
tableSizeString = (StringInfo) linitial(queryResultList);
tableSizeString = MultiClientGetValue(queryResult, 0, 0);
if (tableSizeString == NULL)
{
MultiClientDisconnect(connectionId);
return false;
}
errno = 0;
tableSize = strtoull(tableSizeString->data, &tableSizeStringEnd, 0);
tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0);
if (errno != 0 || (*tableSizeStringEnd) != '\0')
{
ereport(ERROR, (errmsg("could not extract table size for table \"%s\"",
quotedTableName)));
MultiClientClearResult(queryResult);
MultiClientDisconnect(connectionId);
return false;
}
return tableSize;
}
*shardSize = tableSize;
MultiClientClearResult(queryResult);
/*
* WorkerPartitionValue helps in extracting partition column's min or max value
* from the given shard. For this, the function resolves the given distributed
* relation's partition column, connects to the worker node, and runs a select
* query on the given shard.
*/
static StringInfo
WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, char *selectQuery)
{
StringInfo partitionValue = NULL;
List *queryResultList = NIL;
uint32 unusedTableId = 1;
Var *partitionColumn = PartitionColumn(relationId, unusedTableId);
char *partitionColumnName = get_attname(relationId, partitionColumn->varattno);
StringInfo partitionValueQuery = makeStringInfo();
appendStringInfo(partitionValueQuery, selectQuery, partitionColumnName, shardName);
/*
* Note that the following call omits the partition column value's size, and
* simply casts the results to a (char *). If the user partitioned the table
* on a binary byte array, this approach fails and should be fixed.
*/
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, NULL, partitionValueQuery);
if (queryResultList == NIL)
if (partitionType != DISTRIBUTE_BY_APPEND)
{
ereport(ERROR, (errmsg("could not receive shard min/max values from node "
"\"%s:%u\"", nodeName, nodePort)));
/* we don't need min/max for non-append distributed tables */
MultiClientDisconnect(connectionId);
return true;
}
partitionValue = (StringInfo) linitial(queryResultList);
return partitionValue;
/* fill in the partition column name and shard name in the query. */
partitionColumn = PartitionColumn(relationId, unusedTableId);
partitionColumnName = get_attname(relationId, partitionColumn->varattno);
appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY,
partitionColumnName, partitionColumnName, shardName);
queryOK = MultiClientExecute(connectionId, partitionValueQuery->data,
&queryResult, &rowCount, &columnCount);
if (!queryOK)
{
MultiClientDisconnect(connectionId);
return false;
}
minValueIsNull = MultiClientValueIsNull(queryResult, 0, minValueIndex);
maxValueIsNull = MultiClientValueIsNull(queryResult, 0, maxValueIndex);
if (!minValueIsNull && !maxValueIsNull)
{
char *minValueResult = MultiClientGetValue(queryResult, 0, minValueIndex);
char *maxValueResult = MultiClientGetValue(queryResult, 0, maxValueIndex);
*shardMinValue = cstring_to_text(minValueResult);
*shardMaxValue = cstring_to_text(maxValueResult);
}
MultiClientClearResult(queryResult);
MultiClientDisconnect(connectionId);
return true;
}

View File

@ -58,8 +58,7 @@
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
#define WORKER_APPEND_TABLE_TO_SHARD \
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
#define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s"
#define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)"
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)"
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s"

View File

@ -104,6 +104,8 @@ extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
extern void MultiClientDisconnect(int32 connectionId);
extern bool MultiClientConnectionUp(int32 connectionId);
extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
int *rowCount, int *columnCount);
extern bool MultiClientSendQuery(int32 connectionId, const char *query);
extern bool MultiClientCancel(int32 connectionId);
extern ResultStatus MultiClientResultStatus(int32 connectionId);
@ -114,6 +116,7 @@ extern bool MultiClientQueryResult(int32 connectionId, void **queryResult,
extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult,
int *rowCount, int *columnCount);
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);
extern bool MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex);
extern void MultiClientClearResult(void *queryResult);
extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);

View File

@ -97,3 +97,46 @@ SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_left
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_right;
DROP TABLE multi_append_table_to_shard_left;
-- Check partitioning by date
CREATE TABLE multi_append_table_to_shard_date
(
event_date DATE,
value INT
);
SELECT master_create_distributed_table('multi_append_table_to_shard_date', 'event_date', 'append');
-- Create an empty shard and check that we can query the table
SELECT master_create_empty_shard('multi_append_table_to_shard_date');
SELECT * FROM multi_append_table_to_shard_date;
-- Stage an empty table and check that we can query the distributed table
CREATE TABLE multi_append_table_to_shard_stage (LIKE multi_append_table_to_shard_date);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
SELECT * FROM multi_append_table_to_shard_date;
-- Stage NULL values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES (NULL, NULL);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
SELECT * FROM multi_append_table_to_shard_date;
-- Stage regular values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-01-01', 3);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
SELECT * FROM multi_append_table_to_shard_date;
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_date;

View File

@ -151,3 +151,81 @@ SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_left
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_right;
DROP TABLE multi_append_table_to_shard_left;
-- Check partitioning by date
CREATE TABLE multi_append_table_to_shard_date
(
event_date DATE,
value INT
);
SELECT master_create_distributed_table('multi_append_table_to_shard_date', 'event_date', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Create an empty shard and check that we can query the table
SELECT master_create_empty_shard('multi_append_table_to_shard_date');
master_create_empty_shard
---------------------------
230004
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
(0 rows)
-- Stage an empty table and check that we can query the distributed table
CREATE TABLE multi_append_table_to_shard_stage (LIKE multi_append_table_to_shard_date);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
(0 rows)
-- Stage NULL values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES (NULL, NULL);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
|
(1 row)
-- Stage regular values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-01-01', 3);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
|
|
01-01-2016 | 3
(3 rows)
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_date;