diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 1c13237cc..388394856 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -45,7 +45,8 @@ static bool WorkerCreateShard(char *nodeName, uint32 nodePort, static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, uint64 *shardLength, text **shardMinValue, text **shardMaxValue); -static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, char *tableName); +static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, + char *tableName); static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, char *selectQuery); @@ -77,16 +78,15 @@ master_create_empty_shard(PG_FUNCTION_ARGS) List *candidateNodeList = NIL; text *nullMinValue = NULL; text *nullMaxValue = NULL; - char tableType = 0; char partitionMethod = 0; + char storageType = SHARD_STORAGE_TABLE; Oid relationId = ResolveRelationId(relationNameText); CheckDistributedTable(relationId); - tableType = get_rel_relkind(relationId); - if (tableType != RELKIND_RELATION) + if (CStoreTable(relationId)) { - ereport(ERROR, (errmsg("relation \"%s\" is not a regular table", relationName))); + storageType = SHARD_STORAGE_COLUMNAR; } partitionMethod = PartitionMethod(relationId); @@ -130,7 +130,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) CreateShardPlacements(shardId, ddlEventList, candidateNodeList, 0, ShardReplicationFactor); - InsertShardRow(relationId, shardId, SHARD_STORAGE_TABLE, nullMinValue, nullMaxValue); + InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); PG_RETURN_INT64(shardId); } @@ -171,9 +171,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) ShardInterval *shardInterval = LoadShardInterval(shardId); Oid relationId = shardInterval->relationId; + bool cstoreTable = CStoreTable(relationId); char storageType = shardInterval->storageType; - if (storageType != SHARD_STORAGE_TABLE) + if (storageType != SHARD_STORAGE_TABLE && !cstoreTable) { ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId), errdetail("The underlying shard is not a regular table"))); @@ -457,7 +458,7 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam PG_TRY(); { - uint64 tableSize = WorkerTableSize(nodeName, nodePort, shardName); + uint64 tableSize = WorkerTableSize(nodeName, nodePort, relationId, shardName); StringInfo minValue = WorkerPartitionValue(nodeName, nodePort, relationId, shardName, SHARD_MIN_VALUE_QUERY); StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId, @@ -479,18 +480,27 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam /* * WorkerTableSize queries the worker node to extract the disk space used by the - * given relation. The function assumes the relation represents a regular table. + * given relation. The function assumes the relation represents a regular table or + * a cstore_fdw table. */ static uint64 -WorkerTableSize(char *nodeName, uint32 nodePort, char *tableName) +WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName) { uint64 tableSize = 0; List *queryResultList = NIL; StringInfo tableSizeString = NULL; char *tableSizeStringEnd = NULL; - + bool cstoreTable = CStoreTable(relationId); StringInfo tableSizeQuery = makeStringInfo(); - appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, tableName); + + if (cstoreTable) + { + appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, tableName); + } + else + { + appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, tableName); + } queryResultList = ExecuteRemoteQuery(nodeName, nodePort, tableSizeQuery); if (queryResultList == NIL) diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 0e5b68a1d..10cd02324 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -994,11 +994,10 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) StringInfo remoteCopyCommand = NULL; CopyStmt *localCopyCommand = NULL; RangeVar *localTable = NULL; - uint64 copiedRowCount = 0; uint64 shardId = INVALID_SHARD_ID; bool received = false; char *quotedTableName = NULL; - const char *queryString = NULL; + StringInfo queryString = NULL; const char *schemaName = NULL; /* copy remote table's data to this node */ @@ -1032,8 +1031,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) localTable = makeRangeVar((char *) schemaName, shardNameString->data, -1); localCopyCommand = CopyStatement(localTable, localFilePath->data); - DoCopy(localCopyCommand, queryString, &copiedRowCount); - (void) copiedRowCount; + quotedTableName = quote_qualified_identifier(schemaName, shardNameString->data); + + queryString = makeStringInfo(); + appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data); + + ProcessUtility((Node *) localCopyCommand, queryString->data, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); /* finally delete the temporary file we created */ DeleteFile(localFilePath->data); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index f39ce865b..daa3b2414 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -56,6 +56,7 @@ #define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" #define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s" #define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')" +#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size('%s')" #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"