diff --git a/src/bin/csql/stage.c b/src/bin/csql/stage.c index 1863b8bcb..de8bd5e9b 100644 --- a/src/bin/csql/stage.c +++ b/src/bin/csql/stage.c @@ -26,7 +26,8 @@ static bool FileSize(char *filename, uint64 *fileSize); static PGconn * ConnectToWorkerNode(const char *nodeName, uint32 nodePort, const char *nodeDatabase); -static PGresult * ExecuteRemoteCommand(PGconn *remoteConnection, const char *remoteCommand, +static PGresult * ExecuteRemoteCommand(PGconn *remoteConnection, + const char *remoteCommand, const char **parameterValues, int parameterCount); static TableMetadata * InitTableMetadata(const char *tableName); static ShardMetadata * InitShardMetadata(int shardPlacementPolicy); @@ -41,7 +42,8 @@ static uint64 GetValueUint64(const PGresult *result, int rowNumber, int columnNu static bool MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata); static bool MasterGetTableDDLEvents(const char *tableName, TableMetadata *tableMetadata); static bool MasterGetNewShardId(ShardMetadata *shardMetadata); -static bool MasterGetCandidateNodes(ShardMetadata *shardMetadata, int shardPlacementPolicy); +static bool MasterGetCandidateNodes(ShardMetadata *shardMetadata, + int shardPlacementPolicy); static bool MasterInsertShardRow(uint32 logicalRelid, char storageType, const ShardMetadata *shardMetadata); static bool MasterInsertPlacementRows(const ShardMetadata *shardMetadata); @@ -62,7 +64,8 @@ static bool ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char static bool TransmitTableData(PGconn *workerNode, uint64 shardId, uint64 shardMaxSize, copy_options *stageOptions, uint64 currentFileOffset, uint64 *nextFileOffset); -static bool TransmitFile(PGconn *workerNode, const char *localPath, const char *remotePath); +static bool TransmitFile(PGconn *workerNode, const char *localPath, + const char *remotePath); static bool FileStreamOK(const copy_options *stageOptions); static PQExpBuffer CreateCopyQueryString(const char *tableName, const char *columnList, const char *afterToFrom); @@ -166,7 +169,7 @@ DoStageData(const char *stageCommand) if (partitionMethod == DISTRIBUTE_BY_HASH) { psql_error("\\stage: staging data into hash partitioned tables is not " - "supported\n"); + "supported\n"); free_copy_options(stageOptions); FreeTableMetadata(tableMetadata); @@ -179,7 +182,7 @@ DoStageData(const char *stageCommand) bool tableOptionsOK = ColumnarTableOptionsOK(tableMetadata->logicalRelid); if (!tableOptionsOK) { - return false; /* error message already displayed */ + return false; /* error message already displayed */ } } @@ -225,7 +228,7 @@ DoStageData(const char *stageCommand) */ FreeCommonStageData(stageOptions, tableMetadata, shardMetadataList); - return false; /* abort immediately */ + return false; /* abort immediately */ } /* save allocated shard metadata */ @@ -245,7 +248,7 @@ DoStageData(const char *stageCommand) */ for (nodeIndex = 0; nodeIndex < shardMetadata->nodeCount; nodeIndex++) { - char *remoteNodeName = shardMetadata->nodeNameList[nodeIndex]; + char *remoteNodeName = shardMetadata->nodeNameList[nodeIndex]; uint32 remoteNodePort = shardMetadata->nodePortList[nodeIndex]; PGconn *remoteNode = NULL; @@ -341,7 +344,6 @@ DoStageData(const char *stageCommand) /* update current file offset */ currentFileOffset = nextFileOffset; - } /* while more file data left for sharding */ /* @@ -390,9 +392,9 @@ ConnectToWorkerNode(const char *nodeName, uint32 nodePort, const char *nodeDatab char nodePortString[MAXPGPATH]; char connInfoString[MAXPGPATH]; - /* transcribe port number and connection info to their string values */ + /* transcribe port number and connection info to their string values */ snprintf(nodePortString, MAXPGPATH, "%u", nodePort); - snprintf(connInfoString, MAXPGPATH, CONN_INFO_TEMPLATE, + snprintf(connInfoString, MAXPGPATH, CONN_INFO_TEMPLATE, nodeDatabase, CLIENT_CONNECT_TIMEOUT); workerNode = PQsetdb(nodeName, nodePortString, nodeOptions, nodeTty, connInfoString); @@ -421,16 +423,16 @@ ExecuteRemoteCommand(PGconn *remoteConnection, const char *remoteCommand, { PGresult *result = NULL; - const Oid *parameterType = NULL; /* let the backend deduce type */ + const Oid *parameterType = NULL; /* let the backend deduce type */ const int *parameterLength = NULL; /* text params do not need length */ const int *parameterFormat = NULL; /* text params have Null by default */ - const int resultFormat = 0; /* ask for results in text format */ + const int resultFormat = 0; /* ask for results in text format */ result = PQexecParams(remoteConnection, remoteCommand, parameterCount, parameterType, parameterValues, parameterLength, parameterFormat, resultFormat); - if (PQresultStatus(result) != PGRES_COMMAND_OK && + if (PQresultStatus(result) != PGRES_COMMAND_OK && PQresultStatus(result) != PGRES_TUPLES_OK) { psql_error("remote command \"%s\" failed with %s", @@ -488,7 +490,7 @@ FreeTableMetadata(TableMetadata *tableMetadata) for (eventIndex = 0; eventIndex < eventCount; eventIndex++) { char *ddlEvent = tableMetadata->ddlEventList[eventIndex]; - + free(ddlEvent); ddlEvent = NULL; } @@ -552,7 +554,7 @@ FreeShardMetadata(ShardMetadata *shardMetadata) for (nodeIndex = 0; nodeIndex < nodeCount; nodeIndex++) { char *nodeName = shardMetadata->nodeNameList[nodeIndex]; - + free(nodeName); nodeName = NULL; } @@ -655,7 +657,7 @@ ExtendTablename(const char *baseTablename, uint64 shardId) { char *extendedTablename = (char *) pg_malloc0(NAMEDATALEN); - snprintf(extendedTablename, NAMEDATALEN, "%s%c" UINT64_FORMAT, + snprintf(extendedTablename, NAMEDATALEN, "%s%c" UINT64_FORMAT, baseTablename, SHARD_NAME_SEPARATOR, shardId); return extendedTablename; @@ -678,7 +680,7 @@ GetValueUint64(const PGresult *result, int rowNumber, int columnNumber) errno = 0; value = strtoull(valueString, &valueStringEnd, 0); - + if (errno != 0 || (*valueStringEnd) != '\0') { return INVALID_UINT64; @@ -716,7 +718,7 @@ MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata) char *tableStorageType = NULL; char *partitionMethod = NULL; char *partitionKey = NULL; - int partitionKeyLength = 0; + int partitionKeyLength = 0; uint64 logicalRelid = 0; uint64 shardReplicaCount = 0; uint64 shardMaxSize = 0; @@ -727,7 +729,7 @@ MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata) parameterValue, parameterCount); if (result == NULL) { - return false; /* error message already displayed */ + return false; /* error message already displayed */ } /* find column numbers associated with column names */ @@ -798,13 +800,13 @@ MasterGetTableDDLEvents(const char *tableName, TableMetadata *tableMetadata) int ddlEventIndex = 0; /* fetch DDL events needed for table creation */ - result = ExecuteRemoteCommand(masterNode, remoteCommand, + result = ExecuteRemoteCommand(masterNode, remoteCommand, parameterValue, parameterCount); if (result == NULL) { return false; } - + /* check that we have at least one DDL event */ ddlEventCount = PQntuples(result); if (ddlEventCount <= 0) @@ -825,7 +827,7 @@ MasterGetTableDDLEvents(const char *tableName, TableMetadata *tableMetadata) { char *ddlEvent = NULL; char *ddlEventValue = PQgetvalue(result, ddlEventIndex, 0); - int ddlEventLength = PQgetlength(result, ddlEventIndex, 0); + int ddlEventLength = PQgetlength(result, ddlEventIndex, 0); if (ddlEventLength <= 0) { @@ -866,7 +868,7 @@ MasterGetNewShardId(ShardMetadata *shardMetadata) uint64 shardId = 0; /* fetch unique shardId for shard to be created */ - result = ExecuteRemoteCommand(masterNode, remoteCommand, + result = ExecuteRemoteCommand(masterNode, remoteCommand, parameterValue, parameterCount); if (result == NULL) { @@ -877,7 +879,7 @@ MasterGetNewShardId(ShardMetadata *shardMetadata) shardId = GetValueUint64(result, 0, 0); if (shardId == INVALID_UINT64) { - psql_error("remote command \"%s\" failed with invalid shardId\n", + psql_error("remote command \"%s\" failed with invalid shardId\n", remoteCommand); PQclear(result); @@ -996,11 +998,11 @@ MasterGetCandidateNodes(ShardMetadata *shardMetadata, int shardPlacementPolicy) /* walk over fetched node name/port list, and assign them to metadata */ for (nodeIndex = 0; nodeIndex < nodeCount; nodeIndex++) { - char *nodeName = NULL; + char *nodeName = NULL; uint64 nodePort = 0; - char *nodeNameValue = PQgetvalue(result, nodeIndex, nodeNameIndex); - int nodeNameLength = PQgetlength(result, nodeIndex, nodeNameIndex); + char *nodeNameValue = PQgetvalue(result, nodeIndex, nodeNameIndex); + int nodeNameLength = PQgetlength(result, nodeIndex, nodeNameIndex); if (nodeNameLength <= 0) { @@ -1014,7 +1016,7 @@ MasterGetCandidateNodes(ShardMetadata *shardMetadata, int shardPlacementPolicy) /* deep copy node name and assign to metadata */ nodeName = (char *) pg_malloc0(nodeNameLength + 1); strncpy(nodeName, nodeNameValue, nodeNameLength + 1); - + shardMetadata->nodeNameList[nodeIndex] = nodeName; /* convert port value string to 64-bit integer, and assign to metadata */ @@ -1024,7 +1026,7 @@ MasterGetCandidateNodes(ShardMetadata *shardMetadata, int shardPlacementPolicy) psql_error("remote command \"%s\" failed to fetch valid port number\n", remoteCommand); PQclear(result); - + return false; } @@ -1107,12 +1109,12 @@ MasterInsertPlacementRows(const ShardMetadata *shardMetadata) bool staged = shardMetadata->nodeStageList[nodeIndex]; if (staged) { - char *nodeName = shardMetadata->nodeNameList[nodeIndex]; + char *nodeName = shardMetadata->nodeNameList[nodeIndex]; uint32 nodePort = shardMetadata->nodePortList[nodeIndex]; /* convert parameter to its string representation */ snprintf(nodePortString, NAMEDATALEN, "%u", nodePort); - + parameterValue[3] = nodeName; parameterValue[4] = nodePortString; @@ -1136,7 +1138,7 @@ MasterInsertPlacementRows(const ShardMetadata *shardMetadata) * staged to worker nodes. The function executes shard metadata insert commands * within a single transaction so that either all or none of the metadata are * finalized. On success, the function commits the transaction and returns true. - * On failure, the function rolls back the transaction and returns false. + * On failure, the function rolls back the transaction and returns false. */ static bool MasterInsertShardMetadata(uint32 logicalRelid, char storageType, @@ -1199,7 +1201,7 @@ IssueTransactionCommand(PGconn *connection, const char *command) return false; } - + PQclear(result); return true; } @@ -1729,7 +1731,7 @@ ShardColumnarTableSize(PGconn *workerNode, const char *tablename, uint64 shardId * failure, the function returns false. */ static bool -ShardMinMaxValues(PGconn *workerNode, const char *tablename, +ShardMinMaxValues(PGconn *workerNode, const char *tablename, const char *partitionKey, ShardMetadata *shardMetadata) { const int MinValueIndex = 0; @@ -1744,7 +1746,7 @@ ShardMinMaxValues(PGconn *workerNode, const char *tablename, int maxValueLength = 0; extendedTablename = ExtendTablename(tablename, shardMetadata->shardId); - snprintf(remoteCommand, MAXPGPATH, SHARD_MIN_MAX_COMMAND, + snprintf(remoteCommand, MAXPGPATH, SHARD_MIN_MAX_COMMAND, partitionKey, partitionKey, extendedTablename); result = PQexec(workerNode, remoteCommand); diff --git a/src/bin/csql/stage.h b/src/bin/csql/stage.h index 01575f886..0863adff5 100644 --- a/src/bin/csql/stage.h +++ b/src/bin/csql/stage.h @@ -30,42 +30,44 @@ #define ROLLBACK_COMMAND "ROLLBACK" /* Names of remote function calls to execute on the master. */ -#define MASTER_GET_TABLE_METADATA "SELECT * FROM master_get_table_metadata($1::text)" +#define MASTER_GET_TABLE_METADATA "SELECT * FROM master_get_table_metadata($1::text)" #define MASTER_GET_TABLE_DDL_EVENTS "SELECT * FROM master_get_table_ddl_events($1::text)" -#define MASTER_GET_NEW_SHARDID "SELECT * FROM master_get_new_shardid()" -#define MASTER_GET_LOCAL_FIRST_CANDIDATE_NODES "SELECT * FROM \ - master_get_local_first_candidate_nodes()" -#define MASTER_GET_ROUND_ROBIN_CANDIDATE_NODES "SELECT * FROM \ - master_get_round_robin_candidate_nodes($1::int8)" +#define MASTER_GET_NEW_SHARDID "SELECT * FROM master_get_new_shardid()" +#define MASTER_GET_LOCAL_FIRST_CANDIDATE_NODES \ + "SELECT * FROM master_get_local_first_candidate_nodes()" +#define MASTER_GET_ROUND_ROBIN_CANDIDATE_NODES \ + "SELECT * FROM master_get_round_robin_candidate_nodes($1::int8)" -#define MASTER_INSERT_SHARD_ROW "INSERT INTO pg_dist_shard \ - (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES \ - ($1::oid, $2::int8, $3::char, $4::text, $5::text)" -#define MASTER_INSERT_PLACEMENT_ROW "INSERT INTO pg_dist_shard_placement \ - (shardid, shardstate, shardlength, nodename, nodeport) VALUES \ - ($1::int8, $2::int4, $3::int8, $4::text, $5::int4)" +#define MASTER_INSERT_SHARD_ROW \ + "INSERT INTO pg_dist_shard " \ + "(logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES " \ + "($1::oid, $2::int8, $3::char, $4::text, $5::text)" +#define MASTER_INSERT_PLACEMENT_ROW \ + "INSERT INTO pg_dist_shard_placement " \ + "(shardid, shardstate, shardlength, nodename, nodeport) VALUES " \ + "($1::int8, $2::int4, $3::int8, $4::text, $5::int4)" /* Column names used to identify response fields as returned from the master. */ -#define LOGICAL_RELID_FIELD "logical_relid" -#define PART_STORAGE_TYPE_FIELD "part_storage_type" -#define PART_METHOD_FIELD "part_method" -#define PART_KEY_FIELD "part_key" -#define PART_REPLICA_COUNT_FIELD "part_replica_count" -#define PART_MAX_SIZE_FIELD "part_max_size" -#define PART_PLACEMENT_POLICY_FIELD "part_placement_policy" -#define NODE_NAME_FIELD "node_name" -#define NODE_PORT_FIELD "node_port" +#define LOGICAL_RELID_FIELD "logical_relid" +#define PART_STORAGE_TYPE_FIELD "part_storage_type" +#define PART_METHOD_FIELD "part_method" +#define PART_KEY_FIELD "part_key" +#define PART_REPLICA_COUNT_FIELD "part_replica_count" +#define PART_MAX_SIZE_FIELD "part_max_size" +#define PART_PLACEMENT_POLICY_FIELD "part_placement_policy" +#define NODE_NAME_FIELD "node_name" +#define NODE_PORT_FIELD "node_port" /* the tablename in the overloaded COPY statement is the to-be-transferred file */ #define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" FROM STDIN WITH (format 'transmit')" -#define SHARD_MIN_MAX_COMMAND "SELECT min(%s), max(%s) FROM %s" +#define SHARD_MIN_MAX_COMMAND "SELECT min(%s), max(%s) FROM %s" #define SHARD_TABLE_SIZE_COMMAND "SELECT pg_table_size('%s')" #define SET_FOREIGN_TABLE_FILENAME "ALTER FOREIGN TABLE %s OPTIONS (SET filename '%s')" -#define GET_COLUMNAR_TABLE_FILENAME_OPTION "SELECT * FROM \ - (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table \ - WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';" -#define APPLY_SHARD_DDL_COMMAND "SELECT * FROM worker_apply_shard_ddl_command \ - ($1::int8, $2::text)" +#define GET_COLUMNAR_TABLE_FILENAME_OPTION \ + "SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \ + "WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';" +#define APPLY_SHARD_DDL_COMMAND \ + "SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)" #define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')" #define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')" @@ -90,17 +92,16 @@ */ typedef struct TableMetadata { - uint32 logicalRelid; /* table's relationId on the master */ - char tableStorageType; /* relay file, foreign table, or table */ - char partitionMethod; /* table's partition method */ - char *partitionKey; /* partition key expression */ - uint32 shardReplicaCount; /* shard replication factor */ - uint64 shardMaxSize; /* create new shard when shard reaches max size */ + uint32 logicalRelid; /* table's relationId on the master */ + char tableStorageType; /* relay file, foreign table, or table */ + char partitionMethod; /* table's partition method */ + char *partitionKey; /* partition key expression */ + uint32 shardReplicaCount; /* shard replication factor */ + uint64 shardMaxSize; /* create new shard when shard reaches max size */ uint32 shardPlacementPolicy; /* policy to use when choosing nodes to place shards */ char **ddlEventList; /* DDL statements used for creating new shard */ uint32 ddlEventCount; /* DDL statement count; statement list size */ - } TableMetadata; @@ -112,17 +113,16 @@ typedef struct TableMetadata */ typedef struct ShardMetadata { - uint64 shardId; /* global shardId; created on the master node */ + uint64 shardId; /* global shardId; created on the master node */ - char **nodeNameList; /* candidate node name list for shard uploading */ + char **nodeNameList; /* candidate node name list for shard uploading */ uint32 *nodePortList; /* candidate node port list for shard uploading */ - uint32 nodeCount; /* candidate node count; node list size */ - bool *nodeStageList; /* shard uploaded to corresponding candidate node? */ + uint32 nodeCount; /* candidate node count; node list size */ + bool *nodeStageList; /* shard uploaded to corresponding candidate node? */ char *shardMinValue; /* partition key's minimum value in shard */ char *shardMaxValue; /* partition key's maximum value in shard */ - uint64 shardSize; /* shard size; updated during staging */ - + uint64 shardSize; /* shard size; updated during staging */ } ShardMetadata;