mirror of https://github.com/citusdata/citus.git
Format csql's stage files
These are entirely Citus-produced, so need full formatting.pull/327/head
parent
19c529f311
commit
920e0c406d
|
@ -26,7 +26,8 @@
|
||||||
static bool FileSize(char *filename, uint64 *fileSize);
|
static bool FileSize(char *filename, uint64 *fileSize);
|
||||||
static PGconn * ConnectToWorkerNode(const char *nodeName, uint32 nodePort,
|
static PGconn * ConnectToWorkerNode(const char *nodeName, uint32 nodePort,
|
||||||
const char *nodeDatabase);
|
const char *nodeDatabase);
|
||||||
static PGresult * ExecuteRemoteCommand(PGconn *remoteConnection, const char *remoteCommand,
|
static PGresult * ExecuteRemoteCommand(PGconn *remoteConnection,
|
||||||
|
const char *remoteCommand,
|
||||||
const char **parameterValues, int parameterCount);
|
const char **parameterValues, int parameterCount);
|
||||||
static TableMetadata * InitTableMetadata(const char *tableName);
|
static TableMetadata * InitTableMetadata(const char *tableName);
|
||||||
static ShardMetadata * InitShardMetadata(int shardPlacementPolicy);
|
static ShardMetadata * InitShardMetadata(int shardPlacementPolicy);
|
||||||
|
@ -41,7 +42,8 @@ static uint64 GetValueUint64(const PGresult *result, int rowNumber, int columnNu
|
||||||
static bool MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata);
|
static bool MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata);
|
||||||
static bool MasterGetTableDDLEvents(const char *tableName, TableMetadata *tableMetadata);
|
static bool MasterGetTableDDLEvents(const char *tableName, TableMetadata *tableMetadata);
|
||||||
static bool MasterGetNewShardId(ShardMetadata *shardMetadata);
|
static bool MasterGetNewShardId(ShardMetadata *shardMetadata);
|
||||||
static bool MasterGetCandidateNodes(ShardMetadata *shardMetadata, int shardPlacementPolicy);
|
static bool MasterGetCandidateNodes(ShardMetadata *shardMetadata,
|
||||||
|
int shardPlacementPolicy);
|
||||||
static bool MasterInsertShardRow(uint32 logicalRelid, char storageType,
|
static bool MasterInsertShardRow(uint32 logicalRelid, char storageType,
|
||||||
const ShardMetadata *shardMetadata);
|
const ShardMetadata *shardMetadata);
|
||||||
static bool MasterInsertPlacementRows(const ShardMetadata *shardMetadata);
|
static bool MasterInsertPlacementRows(const ShardMetadata *shardMetadata);
|
||||||
|
@ -62,7 +64,8 @@ static bool ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char
|
||||||
static bool TransmitTableData(PGconn *workerNode, uint64 shardId,
|
static bool TransmitTableData(PGconn *workerNode, uint64 shardId,
|
||||||
uint64 shardMaxSize, copy_options *stageOptions,
|
uint64 shardMaxSize, copy_options *stageOptions,
|
||||||
uint64 currentFileOffset, uint64 *nextFileOffset);
|
uint64 currentFileOffset, uint64 *nextFileOffset);
|
||||||
static bool TransmitFile(PGconn *workerNode, const char *localPath, const char *remotePath);
|
static bool TransmitFile(PGconn *workerNode, const char *localPath,
|
||||||
|
const char *remotePath);
|
||||||
static bool FileStreamOK(const copy_options *stageOptions);
|
static bool FileStreamOK(const copy_options *stageOptions);
|
||||||
static PQExpBuffer CreateCopyQueryString(const char *tableName, const char *columnList,
|
static PQExpBuffer CreateCopyQueryString(const char *tableName, const char *columnList,
|
||||||
const char *afterToFrom);
|
const char *afterToFrom);
|
||||||
|
@ -166,7 +169,7 @@ DoStageData(const char *stageCommand)
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
psql_error("\\stage: staging data into hash partitioned tables is not "
|
psql_error("\\stage: staging data into hash partitioned tables is not "
|
||||||
"supported\n");
|
"supported\n");
|
||||||
free_copy_options(stageOptions);
|
free_copy_options(stageOptions);
|
||||||
FreeTableMetadata(tableMetadata);
|
FreeTableMetadata(tableMetadata);
|
||||||
|
|
||||||
|
@ -179,7 +182,7 @@ DoStageData(const char *stageCommand)
|
||||||
bool tableOptionsOK = ColumnarTableOptionsOK(tableMetadata->logicalRelid);
|
bool tableOptionsOK = ColumnarTableOptionsOK(tableMetadata->logicalRelid);
|
||||||
if (!tableOptionsOK)
|
if (!tableOptionsOK)
|
||||||
{
|
{
|
||||||
return false; /* error message already displayed */
|
return false; /* error message already displayed */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,7 +228,7 @@ DoStageData(const char *stageCommand)
|
||||||
*/
|
*/
|
||||||
FreeCommonStageData(stageOptions, tableMetadata, shardMetadataList);
|
FreeCommonStageData(stageOptions, tableMetadata, shardMetadataList);
|
||||||
|
|
||||||
return false; /* abort immediately */
|
return false; /* abort immediately */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* save allocated shard metadata */
|
/* save allocated shard metadata */
|
||||||
|
@ -245,7 +248,7 @@ DoStageData(const char *stageCommand)
|
||||||
*/
|
*/
|
||||||
for (nodeIndex = 0; nodeIndex < shardMetadata->nodeCount; nodeIndex++)
|
for (nodeIndex = 0; nodeIndex < shardMetadata->nodeCount; nodeIndex++)
|
||||||
{
|
{
|
||||||
char *remoteNodeName = shardMetadata->nodeNameList[nodeIndex];
|
char *remoteNodeName = shardMetadata->nodeNameList[nodeIndex];
|
||||||
uint32 remoteNodePort = shardMetadata->nodePortList[nodeIndex];
|
uint32 remoteNodePort = shardMetadata->nodePortList[nodeIndex];
|
||||||
|
|
||||||
PGconn *remoteNode = NULL;
|
PGconn *remoteNode = NULL;
|
||||||
|
@ -341,7 +344,6 @@ DoStageData(const char *stageCommand)
|
||||||
|
|
||||||
/* update current file offset */
|
/* update current file offset */
|
||||||
currentFileOffset = nextFileOffset;
|
currentFileOffset = nextFileOffset;
|
||||||
|
|
||||||
} /* while more file data left for sharding */
|
} /* while more file data left for sharding */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -421,10 +423,10 @@ ExecuteRemoteCommand(PGconn *remoteConnection, const char *remoteCommand,
|
||||||
{
|
{
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
|
|
||||||
const Oid *parameterType = NULL; /* let the backend deduce type */
|
const Oid *parameterType = NULL; /* let the backend deduce type */
|
||||||
const int *parameterLength = NULL; /* text params do not need length */
|
const int *parameterLength = NULL; /* text params do not need length */
|
||||||
const int *parameterFormat = NULL; /* text params have Null by default */
|
const int *parameterFormat = NULL; /* text params have Null by default */
|
||||||
const int resultFormat = 0; /* ask for results in text format */
|
const int resultFormat = 0; /* ask for results in text format */
|
||||||
|
|
||||||
result = PQexecParams(remoteConnection, remoteCommand,
|
result = PQexecParams(remoteConnection, remoteCommand,
|
||||||
parameterCount, parameterType, parameterValues,
|
parameterCount, parameterType, parameterValues,
|
||||||
|
@ -716,7 +718,7 @@ MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata)
|
||||||
char *tableStorageType = NULL;
|
char *tableStorageType = NULL;
|
||||||
char *partitionMethod = NULL;
|
char *partitionMethod = NULL;
|
||||||
char *partitionKey = NULL;
|
char *partitionKey = NULL;
|
||||||
int partitionKeyLength = 0;
|
int partitionKeyLength = 0;
|
||||||
uint64 logicalRelid = 0;
|
uint64 logicalRelid = 0;
|
||||||
uint64 shardReplicaCount = 0;
|
uint64 shardReplicaCount = 0;
|
||||||
uint64 shardMaxSize = 0;
|
uint64 shardMaxSize = 0;
|
||||||
|
@ -727,7 +729,7 @@ MasterGetTableMetadata(const char *tableName, TableMetadata *tableMetadata)
|
||||||
parameterValue, parameterCount);
|
parameterValue, parameterCount);
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
{
|
{
|
||||||
return false; /* error message already displayed */
|
return false; /* error message already displayed */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* find column numbers associated with column names */
|
/* find column numbers associated with column names */
|
||||||
|
@ -825,7 +827,7 @@ MasterGetTableDDLEvents(const char *tableName, TableMetadata *tableMetadata)
|
||||||
{
|
{
|
||||||
char *ddlEvent = NULL;
|
char *ddlEvent = NULL;
|
||||||
char *ddlEventValue = PQgetvalue(result, ddlEventIndex, 0);
|
char *ddlEventValue = PQgetvalue(result, ddlEventIndex, 0);
|
||||||
int ddlEventLength = PQgetlength(result, ddlEventIndex, 0);
|
int ddlEventLength = PQgetlength(result, ddlEventIndex, 0);
|
||||||
|
|
||||||
if (ddlEventLength <= 0)
|
if (ddlEventLength <= 0)
|
||||||
{
|
{
|
||||||
|
@ -996,11 +998,11 @@ MasterGetCandidateNodes(ShardMetadata *shardMetadata, int shardPlacementPolicy)
|
||||||
/* walk over fetched node name/port list, and assign them to metadata */
|
/* walk over fetched node name/port list, and assign them to metadata */
|
||||||
for (nodeIndex = 0; nodeIndex < nodeCount; nodeIndex++)
|
for (nodeIndex = 0; nodeIndex < nodeCount; nodeIndex++)
|
||||||
{
|
{
|
||||||
char *nodeName = NULL;
|
char *nodeName = NULL;
|
||||||
uint64 nodePort = 0;
|
uint64 nodePort = 0;
|
||||||
|
|
||||||
char *nodeNameValue = PQgetvalue(result, nodeIndex, nodeNameIndex);
|
char *nodeNameValue = PQgetvalue(result, nodeIndex, nodeNameIndex);
|
||||||
int nodeNameLength = PQgetlength(result, nodeIndex, nodeNameIndex);
|
int nodeNameLength = PQgetlength(result, nodeIndex, nodeNameIndex);
|
||||||
|
|
||||||
if (nodeNameLength <= 0)
|
if (nodeNameLength <= 0)
|
||||||
{
|
{
|
||||||
|
@ -1107,7 +1109,7 @@ MasterInsertPlacementRows(const ShardMetadata *shardMetadata)
|
||||||
bool staged = shardMetadata->nodeStageList[nodeIndex];
|
bool staged = shardMetadata->nodeStageList[nodeIndex];
|
||||||
if (staged)
|
if (staged)
|
||||||
{
|
{
|
||||||
char *nodeName = shardMetadata->nodeNameList[nodeIndex];
|
char *nodeName = shardMetadata->nodeNameList[nodeIndex];
|
||||||
uint32 nodePort = shardMetadata->nodePortList[nodeIndex];
|
uint32 nodePort = shardMetadata->nodePortList[nodeIndex];
|
||||||
|
|
||||||
/* convert parameter to its string representation */
|
/* convert parameter to its string representation */
|
||||||
|
|
|
@ -30,42 +30,44 @@
|
||||||
#define ROLLBACK_COMMAND "ROLLBACK"
|
#define ROLLBACK_COMMAND "ROLLBACK"
|
||||||
|
|
||||||
/* Names of remote function calls to execute on the master. */
|
/* Names of remote function calls to execute on the master. */
|
||||||
#define MASTER_GET_TABLE_METADATA "SELECT * FROM master_get_table_metadata($1::text)"
|
#define MASTER_GET_TABLE_METADATA "SELECT * FROM master_get_table_metadata($1::text)"
|
||||||
#define MASTER_GET_TABLE_DDL_EVENTS "SELECT * FROM master_get_table_ddl_events($1::text)"
|
#define MASTER_GET_TABLE_DDL_EVENTS "SELECT * FROM master_get_table_ddl_events($1::text)"
|
||||||
#define MASTER_GET_NEW_SHARDID "SELECT * FROM master_get_new_shardid()"
|
#define MASTER_GET_NEW_SHARDID "SELECT * FROM master_get_new_shardid()"
|
||||||
#define MASTER_GET_LOCAL_FIRST_CANDIDATE_NODES "SELECT * FROM \
|
#define MASTER_GET_LOCAL_FIRST_CANDIDATE_NODES \
|
||||||
master_get_local_first_candidate_nodes()"
|
"SELECT * FROM master_get_local_first_candidate_nodes()"
|
||||||
#define MASTER_GET_ROUND_ROBIN_CANDIDATE_NODES "SELECT * FROM \
|
#define MASTER_GET_ROUND_ROBIN_CANDIDATE_NODES \
|
||||||
master_get_round_robin_candidate_nodes($1::int8)"
|
"SELECT * FROM master_get_round_robin_candidate_nodes($1::int8)"
|
||||||
|
|
||||||
#define MASTER_INSERT_SHARD_ROW "INSERT INTO pg_dist_shard \
|
#define MASTER_INSERT_SHARD_ROW \
|
||||||
(logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES \
|
"INSERT INTO pg_dist_shard " \
|
||||||
($1::oid, $2::int8, $3::char, $4::text, $5::text)"
|
"(logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES " \
|
||||||
#define MASTER_INSERT_PLACEMENT_ROW "INSERT INTO pg_dist_shard_placement \
|
"($1::oid, $2::int8, $3::char, $4::text, $5::text)"
|
||||||
(shardid, shardstate, shardlength, nodename, nodeport) VALUES \
|
#define MASTER_INSERT_PLACEMENT_ROW \
|
||||||
($1::int8, $2::int4, $3::int8, $4::text, $5::int4)"
|
"INSERT INTO pg_dist_shard_placement " \
|
||||||
|
"(shardid, shardstate, shardlength, nodename, nodeport) VALUES " \
|
||||||
|
"($1::int8, $2::int4, $3::int8, $4::text, $5::int4)"
|
||||||
|
|
||||||
/* Column names used to identify response fields as returned from the master. */
|
/* Column names used to identify response fields as returned from the master. */
|
||||||
#define LOGICAL_RELID_FIELD "logical_relid"
|
#define LOGICAL_RELID_FIELD "logical_relid"
|
||||||
#define PART_STORAGE_TYPE_FIELD "part_storage_type"
|
#define PART_STORAGE_TYPE_FIELD "part_storage_type"
|
||||||
#define PART_METHOD_FIELD "part_method"
|
#define PART_METHOD_FIELD "part_method"
|
||||||
#define PART_KEY_FIELD "part_key"
|
#define PART_KEY_FIELD "part_key"
|
||||||
#define PART_REPLICA_COUNT_FIELD "part_replica_count"
|
#define PART_REPLICA_COUNT_FIELD "part_replica_count"
|
||||||
#define PART_MAX_SIZE_FIELD "part_max_size"
|
#define PART_MAX_SIZE_FIELD "part_max_size"
|
||||||
#define PART_PLACEMENT_POLICY_FIELD "part_placement_policy"
|
#define PART_PLACEMENT_POLICY_FIELD "part_placement_policy"
|
||||||
#define NODE_NAME_FIELD "node_name"
|
#define NODE_NAME_FIELD "node_name"
|
||||||
#define NODE_PORT_FIELD "node_port"
|
#define NODE_PORT_FIELD "node_port"
|
||||||
|
|
||||||
/* the tablename in the overloaded COPY statement is the to-be-transferred file */
|
/* the tablename in the overloaded COPY statement is the to-be-transferred file */
|
||||||
#define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" FROM STDIN WITH (format 'transmit')"
|
#define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" FROM STDIN WITH (format 'transmit')"
|
||||||
#define SHARD_MIN_MAX_COMMAND "SELECT min(%s), max(%s) FROM %s"
|
#define SHARD_MIN_MAX_COMMAND "SELECT min(%s), max(%s) FROM %s"
|
||||||
#define SHARD_TABLE_SIZE_COMMAND "SELECT pg_table_size('%s')"
|
#define SHARD_TABLE_SIZE_COMMAND "SELECT pg_table_size('%s')"
|
||||||
#define SET_FOREIGN_TABLE_FILENAME "ALTER FOREIGN TABLE %s OPTIONS (SET filename '%s')"
|
#define SET_FOREIGN_TABLE_FILENAME "ALTER FOREIGN TABLE %s OPTIONS (SET filename '%s')"
|
||||||
#define GET_COLUMNAR_TABLE_FILENAME_OPTION "SELECT * FROM \
|
#define GET_COLUMNAR_TABLE_FILENAME_OPTION \
|
||||||
(SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table \
|
"SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \
|
||||||
WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
|
"WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
|
||||||
#define APPLY_SHARD_DDL_COMMAND "SELECT * FROM worker_apply_shard_ddl_command \
|
#define APPLY_SHARD_DDL_COMMAND \
|
||||||
($1::int8, $2::text)"
|
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)"
|
||||||
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
|
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
|
||||||
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"
|
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"
|
||||||
|
|
||||||
|
@ -90,17 +92,16 @@
|
||||||
*/
|
*/
|
||||||
typedef struct TableMetadata
|
typedef struct TableMetadata
|
||||||
{
|
{
|
||||||
uint32 logicalRelid; /* table's relationId on the master */
|
uint32 logicalRelid; /* table's relationId on the master */
|
||||||
char tableStorageType; /* relay file, foreign table, or table */
|
char tableStorageType; /* relay file, foreign table, or table */
|
||||||
char partitionMethod; /* table's partition method */
|
char partitionMethod; /* table's partition method */
|
||||||
char *partitionKey; /* partition key expression */
|
char *partitionKey; /* partition key expression */
|
||||||
uint32 shardReplicaCount; /* shard replication factor */
|
uint32 shardReplicaCount; /* shard replication factor */
|
||||||
uint64 shardMaxSize; /* create new shard when shard reaches max size */
|
uint64 shardMaxSize; /* create new shard when shard reaches max size */
|
||||||
uint32 shardPlacementPolicy; /* policy to use when choosing nodes to place shards */
|
uint32 shardPlacementPolicy; /* policy to use when choosing nodes to place shards */
|
||||||
|
|
||||||
char **ddlEventList; /* DDL statements used for creating new shard */
|
char **ddlEventList; /* DDL statements used for creating new shard */
|
||||||
uint32 ddlEventCount; /* DDL statement count; statement list size */
|
uint32 ddlEventCount; /* DDL statement count; statement list size */
|
||||||
|
|
||||||
} TableMetadata;
|
} TableMetadata;
|
||||||
|
|
||||||
|
|
||||||
|
@ -112,17 +113,16 @@ typedef struct TableMetadata
|
||||||
*/
|
*/
|
||||||
typedef struct ShardMetadata
|
typedef struct ShardMetadata
|
||||||
{
|
{
|
||||||
uint64 shardId; /* global shardId; created on the master node */
|
uint64 shardId; /* global shardId; created on the master node */
|
||||||
|
|
||||||
char **nodeNameList; /* candidate node name list for shard uploading */
|
char **nodeNameList; /* candidate node name list for shard uploading */
|
||||||
uint32 *nodePortList; /* candidate node port list for shard uploading */
|
uint32 *nodePortList; /* candidate node port list for shard uploading */
|
||||||
uint32 nodeCount; /* candidate node count; node list size */
|
uint32 nodeCount; /* candidate node count; node list size */
|
||||||
bool *nodeStageList; /* shard uploaded to corresponding candidate node? */
|
bool *nodeStageList; /* shard uploaded to corresponding candidate node? */
|
||||||
|
|
||||||
char *shardMinValue; /* partition key's minimum value in shard */
|
char *shardMinValue; /* partition key's minimum value in shard */
|
||||||
char *shardMaxValue; /* partition key's maximum value in shard */
|
char *shardMaxValue; /* partition key's maximum value in shard */
|
||||||
uint64 shardSize; /* shard size; updated during staging */
|
uint64 shardSize; /* shard size; updated during staging */
|
||||||
|
|
||||||
} ShardMetadata;
|
} ShardMetadata;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue