diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 88d465337..597960bf2 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -315,6 +315,26 @@ MultiClientConnectionUp(int32 connectionId) } +/* MultiClientExecute synchronously executes a query over the given connection. */ +bool +MultiClientExecute(int32 connectionId, const char *query, void **queryResult, + int *rowCount, int *columnCount) +{ + bool querySent = false; + bool queryOK = false; + + querySent = MultiClientSendQuery(connectionId, query); + if (!querySent) + { + return false; + } + + queryOK = MultiClientQueryResult(connectionId, queryResult, rowCount, columnCount); + + return queryOK; +} + + /* MultiClientSendQuery sends the given query over the given connection. */ bool MultiClientSendQuery(int32 connectionId, const char *query) @@ -532,6 +552,15 @@ MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex) } +/* MultiClientValueIsNull returns whether the value at the given position is null. */ +bool +MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex) +{ + bool isNull = PQgetisnull((PGresult *) queryResult, rowIndex, columnIndex); + return isNull; +} + + /* MultiClientClearResult free's the memory associated with a PGresult. */ void MultiClientClearResult(void *queryResult) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index e4ae01c73..49941cc8f 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -21,6 +21,7 @@ #include "access/htup_details.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "distributed/multi_client_executor.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -45,11 +46,6 @@ static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); -static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, - char *tableName); -static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId, - char *shardName, char *selectQuery); - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_empty_shard); @@ -314,8 +310,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) errdetail("Marking this shard placement as inactive"))); } - RESUME_INTERRUPTS(); - /* update shard statistics and get new shard size */ newShardSize = UpdateShardStatistics(shardId); @@ -323,6 +317,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes); + RESUME_INTERRUPTS(); + PG_RETURN_FLOAT4(shardFillLevel); } @@ -585,109 +581,117 @@ static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue) { - bool shardStatsOK = true; - - PG_TRY(); - { - uint64 tableSize = WorkerTableSize(nodeName, nodePort, relationId, shardName); - StringInfo minValue = WorkerPartitionValue(nodeName, nodePort, relationId, - shardName, SHARD_MIN_VALUE_QUERY); - StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId, - shardName, SHARD_MAX_VALUE_QUERY); - - (*shardSize) = tableSize; - (*shardMinValue) = cstring_to_text_with_len(minValue->data, minValue->len); - (*shardMaxValue) = cstring_to_text_with_len(maxValue->data, maxValue->len); - } - PG_CATCH(); - { - shardStatsOK = false; - } - PG_END_TRY(); - - return shardStatsOK; -} - - -/* - * WorkerTableSize queries the worker node to extract the disk space used by the - * given relation. The function assumes the relation represents a regular table or - * a cstore_fdw table. - */ -static uint64 -WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName) -{ - uint64 tableSize = 0; - List *queryResultList = NIL; - StringInfo tableSizeString = NULL; - char *tableSizeStringEnd = NULL; - char *quotedTableName = quote_literal_cstr(tableName); - bool cstoreTable = CStoreTable(relationId); + char *quotedShardName = NULL; + bool cstoreTable = false; StringInfo tableSizeQuery = makeStringInfo(); + const uint32 unusedTableId = 1; + char partitionType = PartitionMethod(relationId); + Var *partitionColumn = NULL; + char *partitionColumnName = NULL; + StringInfo partitionValueQuery = makeStringInfo(); + int32 connectionId = -1; + bool queryOK = false; + void *queryResult = NULL; + int rowCount = 0; + int columnCount = 0; + const int minValueIndex = 0; + const int maxValueIndex = 1; + + uint64 tableSize = 0; + char *tableSizeString = NULL; + char *tableSizeStringEnd = NULL; + bool minValueIsNull = false; + bool maxValueIsNull = false; + + *shardSize = 0; + *shardMinValue = NULL; + *shardMaxValue = NULL; + + connectionId = MultiClientConnect(nodeName, nodePort, NULL, NULL); + if (connectionId == INVALID_CONNECTION_ID) + { + return false; + } + + quotedShardName = quote_literal_cstr(shardName); + + cstoreTable = CStoreTable(relationId); if (cstoreTable) { - appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, quotedTableName); + appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, quotedShardName); } else { - appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedTableName); + appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName); } - queryResultList = ExecuteRemoteQuery(nodeName, nodePort, NULL, tableSizeQuery); - if (queryResultList == NIL) + queryOK = MultiClientExecute(connectionId, tableSizeQuery->data, + &queryResult, &rowCount, &columnCount); + if (!queryOK) { - ereport(ERROR, (errmsg("could not receive table size from node " - "\"%s:%u\"", nodeName, nodePort))); + MultiClientDisconnect(connectionId); + return false; } - tableSizeString = (StringInfo) linitial(queryResultList); + tableSizeString = MultiClientGetValue(queryResult, 0, 0); + if (tableSizeString == NULL) + { + MultiClientDisconnect(connectionId); + return false; + } errno = 0; - tableSize = strtoull(tableSizeString->data, &tableSizeStringEnd, 0); + tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0); if (errno != 0 || (*tableSizeStringEnd) != '\0') { - ereport(ERROR, (errmsg("could not extract table size for table \"%s\"", - quotedTableName))); + MultiClientClearResult(queryResult); + MultiClientDisconnect(connectionId); + return false; } - return tableSize; -} + *shardSize = tableSize; + MultiClientClearResult(queryResult); -/* - * WorkerPartitionValue helps in extracting partition column's min or max value - * from the given shard. For this, the function resolves the given distributed - * relation's partition column, connects to the worker node, and runs a select - * query on the given shard. - */ -static StringInfo -WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId, - char *shardName, char *selectQuery) -{ - StringInfo partitionValue = NULL; - List *queryResultList = NIL; - uint32 unusedTableId = 1; - - Var *partitionColumn = PartitionColumn(relationId, unusedTableId); - char *partitionColumnName = get_attname(relationId, partitionColumn->varattno); - - StringInfo partitionValueQuery = makeStringInfo(); - appendStringInfo(partitionValueQuery, selectQuery, partitionColumnName, shardName); - - /* - * Note that the following call omits the partition column value's size, and - * simply casts the results to a (char *). If the user partitioned the table - * on a binary byte array, this approach fails and should be fixed. - */ - queryResultList = ExecuteRemoteQuery(nodeName, nodePort, NULL, partitionValueQuery); - if (queryResultList == NIL) + if (partitionType != DISTRIBUTE_BY_APPEND) { - ereport(ERROR, (errmsg("could not receive shard min/max values from node " - "\"%s:%u\"", nodeName, nodePort))); + /* we don't need min/max for non-append distributed tables */ + MultiClientDisconnect(connectionId); + + return true; } - partitionValue = (StringInfo) linitial(queryResultList); - return partitionValue; + /* fill in the partition column name and shard name in the query. */ + partitionColumn = PartitionColumn(relationId, unusedTableId); + partitionColumnName = get_attname(relationId, partitionColumn->varattno); + + appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY, + partitionColumnName, partitionColumnName, shardName); + + queryOK = MultiClientExecute(connectionId, partitionValueQuery->data, + &queryResult, &rowCount, &columnCount); + if (!queryOK) + { + MultiClientDisconnect(connectionId); + return false; + } + + minValueIsNull = MultiClientValueIsNull(queryResult, 0, minValueIndex); + maxValueIsNull = MultiClientValueIsNull(queryResult, 0, maxValueIndex); + + if (!minValueIsNull && !maxValueIsNull) + { + char *minValueResult = MultiClientGetValue(queryResult, 0, minValueIndex); + char *maxValueResult = MultiClientGetValue(queryResult, 0, maxValueIndex); + + *shardMinValue = cstring_to_text(minValueResult); + *shardMaxValue = cstring_to_text(maxValueResult); + } + + MultiClientClearResult(queryResult); + MultiClientDisconnect(connectionId); + + return true; } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 4b4e5a8fb..060f4c546 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -58,8 +58,7 @@ "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)" #define WORKER_APPEND_TABLE_TO_SHARD \ "SELECT worker_append_table_to_shard (%s, %s, %s, %u)" -#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" -#define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s" +#define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s" #define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)" #define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)" #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s" diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index ab5b9bc38..e9658ee22 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -104,6 +104,8 @@ extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort, extern ConnectStatus MultiClientConnectPoll(int32 connectionId); extern void MultiClientDisconnect(int32 connectionId); extern bool MultiClientConnectionUp(int32 connectionId); +extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult, + int *rowCount, int *columnCount); extern bool MultiClientSendQuery(int32 connectionId, const char *query); extern bool MultiClientCancel(int32 connectionId); extern ResultStatus MultiClientResultStatus(int32 connectionId); @@ -114,6 +116,7 @@ extern bool MultiClientQueryResult(int32 connectionId, void **queryResult, extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount); extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex); +extern bool MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex); extern void MultiClientClearResult(void *queryResult); extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections); diff --git a/src/test/regress/input/multi_append_table_to_shard.source b/src/test/regress/input/multi_append_table_to_shard.source index 4f453f9e0..e843ff9f0 100644 --- a/src/test/regress/input/multi_append_table_to_shard.source +++ b/src/test/regress/input/multi_append_table_to_shard.source @@ -97,3 +97,46 @@ SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_left DROP TABLE multi_append_table_to_shard_stage; DROP TABLE multi_append_table_to_shard_right; DROP TABLE multi_append_table_to_shard_left; + +-- Check partitioning by date +CREATE TABLE multi_append_table_to_shard_date +( + event_date DATE, + value INT +); +SELECT master_create_distributed_table('multi_append_table_to_shard_date', 'event_date', 'append'); + +-- Create an empty shard and check that we can query the table +SELECT master_create_empty_shard('multi_append_table_to_shard_date'); + +SELECT * FROM multi_append_table_to_shard_date; + +-- Stage an empty table and check that we can query the distributed table +CREATE TABLE multi_append_table_to_shard_stage (LIKE multi_append_table_to_shard_date); +SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) +FROM + pg_dist_shard +WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid; + +SELECT * FROM multi_append_table_to_shard_date; + +-- Stage NULL values and check that we can query the table +INSERT INTO multi_append_table_to_shard_stage VALUES (NULL, NULL); +SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) +FROM + pg_dist_shard +WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid; + +SELECT * FROM multi_append_table_to_shard_date; + +-- Stage regular values and check that we can query the table +INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-01-01', 3); +SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) +FROM + pg_dist_shard +WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid; + +SELECT * FROM multi_append_table_to_shard_date; + +DROP TABLE multi_append_table_to_shard_stage; +DROP TABLE multi_append_table_to_shard_date; diff --git a/src/test/regress/output/multi_append_table_to_shard.source b/src/test/regress/output/multi_append_table_to_shard.source index f3a3b3c91..c0b23b9d8 100644 --- a/src/test/regress/output/multi_append_table_to_shard.source +++ b/src/test/regress/output/multi_append_table_to_shard.source @@ -151,3 +151,81 @@ SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_left DROP TABLE multi_append_table_to_shard_stage; DROP TABLE multi_append_table_to_shard_right; DROP TABLE multi_append_table_to_shard_left; +-- Check partitioning by date +CREATE TABLE multi_append_table_to_shard_date +( + event_date DATE, + value INT +); +SELECT master_create_distributed_table('multi_append_table_to_shard_date', 'event_date', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Create an empty shard and check that we can query the table +SELECT master_create_empty_shard('multi_append_table_to_shard_date'); + master_create_empty_shard +--------------------------- + 230004 +(1 row) + +SELECT * FROM multi_append_table_to_shard_date; + event_date | value +------------+------- +(0 rows) + +-- Stage an empty table and check that we can query the distributed table +CREATE TABLE multi_append_table_to_shard_stage (LIKE multi_append_table_to_shard_date); +SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) +FROM + pg_dist_shard +WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid; + master_append_table_to_shard +------------------------------ + 0 +(1 row) + +SELECT * FROM multi_append_table_to_shard_date; + event_date | value +------------+------- +(0 rows) + +-- Stage NULL values and check that we can query the table +INSERT INTO multi_append_table_to_shard_stage VALUES (NULL, NULL); +SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) +FROM + pg_dist_shard +WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid; + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +SELECT * FROM multi_append_table_to_shard_date; + event_date | value +------------+------- + | +(1 row) + +-- Stage regular values and check that we can query the table +INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-01-01', 3); +SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) +FROM + pg_dist_shard +WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid; + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +SELECT * FROM multi_append_table_to_shard_date; + event_date | value +------------+------- + | + | + 01-01-2016 | 3 +(3 rows) + +DROP TABLE multi_append_table_to_shard_stage; +DROP TABLE multi_append_table_to_shard_date;