Add support for appending to cstore table shards

- Flexed the check which prevented append operation cstore tables
  since its storage type is not SHARD_STORAGE_TABLE.
- Used process utility function to perform copy operation in
  worker_append_table_to shard() instead of directly calling
  postgresql DoCopy().
- Removed the additional check in master_create_empty_shard() function.
  This check was redundant and erroneous since it was called after
  CheckDistributedTable() call.
- Modified WorkerTableSize() function to retrieve cstore table shard
  size correctly.
pull/334/head
Murat Tuncer 2016-02-12 14:41:32 +02:00
parent 334f800016
commit 444f305165
3 changed files with 31 additions and 16 deletions

View File

@ -45,7 +45,8 @@ static bool WorkerCreateShard(char *nodeName, uint32 nodePort,
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardLength, char *shardName, uint64 *shardLength,
text **shardMinValue, text **shardMaxValue); text **shardMinValue, text **shardMaxValue);
static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, char *tableName); static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId,
char *tableName);
static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId, static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, char *selectQuery); char *shardName, char *selectQuery);
@ -77,16 +78,15 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
List *candidateNodeList = NIL; List *candidateNodeList = NIL;
text *nullMinValue = NULL; text *nullMinValue = NULL;
text *nullMaxValue = NULL; text *nullMaxValue = NULL;
char tableType = 0;
char partitionMethod = 0; char partitionMethod = 0;
char storageType = SHARD_STORAGE_TABLE;
Oid relationId = ResolveRelationId(relationNameText); Oid relationId = ResolveRelationId(relationNameText);
CheckDistributedTable(relationId); CheckDistributedTable(relationId);
tableType = get_rel_relkind(relationId); if (CStoreTable(relationId))
if (tableType != RELKIND_RELATION)
{ {
ereport(ERROR, (errmsg("relation \"%s\" is not a regular table", relationName))); storageType = SHARD_STORAGE_COLUMNAR;
} }
partitionMethod = PartitionMethod(relationId); partitionMethod = PartitionMethod(relationId);
@ -130,7 +130,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
CreateShardPlacements(shardId, ddlEventList, candidateNodeList, 0, CreateShardPlacements(shardId, ddlEventList, candidateNodeList, 0,
ShardReplicationFactor); ShardReplicationFactor);
InsertShardRow(relationId, shardId, SHARD_STORAGE_TABLE, nullMinValue, nullMaxValue); InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
PG_RETURN_INT64(shardId); PG_RETURN_INT64(shardId);
} }
@ -171,9 +171,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
bool cstoreTable = CStoreTable(relationId);
char storageType = shardInterval->storageType; char storageType = shardInterval->storageType;
if (storageType != SHARD_STORAGE_TABLE) if (storageType != SHARD_STORAGE_TABLE && !cstoreTable)
{ {
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId), ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
errdetail("The underlying shard is not a regular table"))); errdetail("The underlying shard is not a regular table")));
@ -457,7 +458,7 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
PG_TRY(); PG_TRY();
{ {
uint64 tableSize = WorkerTableSize(nodeName, nodePort, shardName); uint64 tableSize = WorkerTableSize(nodeName, nodePort, relationId, shardName);
StringInfo minValue = WorkerPartitionValue(nodeName, nodePort, relationId, StringInfo minValue = WorkerPartitionValue(nodeName, nodePort, relationId,
shardName, SHARD_MIN_VALUE_QUERY); shardName, SHARD_MIN_VALUE_QUERY);
StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId, StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId,
@ -479,18 +480,27 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
/* /*
* WorkerTableSize queries the worker node to extract the disk space used by the * WorkerTableSize queries the worker node to extract the disk space used by the
* given relation. The function assumes the relation represents a regular table. * given relation. The function assumes the relation represents a regular table or
* a cstore_fdw table.
*/ */
static uint64 static uint64
WorkerTableSize(char *nodeName, uint32 nodePort, char *tableName) WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName)
{ {
uint64 tableSize = 0; uint64 tableSize = 0;
List *queryResultList = NIL; List *queryResultList = NIL;
StringInfo tableSizeString = NULL; StringInfo tableSizeString = NULL;
char *tableSizeStringEnd = NULL; char *tableSizeStringEnd = NULL;
bool cstoreTable = CStoreTable(relationId);
StringInfo tableSizeQuery = makeStringInfo(); StringInfo tableSizeQuery = makeStringInfo();
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, tableName);
if (cstoreTable)
{
appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, tableName);
}
else
{
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, tableName);
}
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, tableSizeQuery); queryResultList = ExecuteRemoteQuery(nodeName, nodePort, tableSizeQuery);
if (queryResultList == NIL) if (queryResultList == NIL)

View File

@ -994,11 +994,10 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
StringInfo remoteCopyCommand = NULL; StringInfo remoteCopyCommand = NULL;
CopyStmt *localCopyCommand = NULL; CopyStmt *localCopyCommand = NULL;
RangeVar *localTable = NULL; RangeVar *localTable = NULL;
uint64 copiedRowCount = 0;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
bool received = false; bool received = false;
char *quotedTableName = NULL; char *quotedTableName = NULL;
const char *queryString = NULL; StringInfo queryString = NULL;
const char *schemaName = NULL; const char *schemaName = NULL;
/* copy remote table's data to this node */ /* copy remote table's data to this node */
@ -1032,8 +1031,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
localTable = makeRangeVar((char *) schemaName, shardNameString->data, -1); localTable = makeRangeVar((char *) schemaName, shardNameString->data, -1);
localCopyCommand = CopyStatement(localTable, localFilePath->data); localCopyCommand = CopyStatement(localTable, localFilePath->data);
DoCopy(localCopyCommand, queryString, &copiedRowCount); quotedTableName = quote_qualified_identifier(schemaName, shardNameString->data);
(void) copiedRowCount;
queryString = makeStringInfo();
appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data);
ProcessUtility((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
/* finally delete the temporary file we created */ /* finally delete the temporary file we created */
DeleteFile(localFilePath->data); DeleteFile(localFilePath->data);

View File

@ -56,6 +56,7 @@
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" #define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
#define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s" #define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')" #define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')"
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size('%s')"
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s" #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s"
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s"
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"