mirror of https://github.com/citusdata/citus.git
Run check_indent on COPY changes
parent
be86f40f4c
commit
0fb1b9eea4
|
@ -119,12 +119,12 @@ typedef struct CopyStateData
|
||||||
CopyDest copy_dest; /* type of copy source/destination */
|
CopyDest copy_dest; /* type of copy source/destination */
|
||||||
FILE *copy_file; /* used if copy_dest == COPY_FILE */
|
FILE *copy_file; /* used if copy_dest == COPY_FILE */
|
||||||
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
||||||
* dest == COPY_NEW_FE in COPY FROM */
|
* dest == COPY_NEW_FE in COPY FROM */
|
||||||
bool fe_eof; /* true if detected end of copy data */
|
bool fe_eof; /* true if detected end of copy data */
|
||||||
EolType eol_type; /* EOL type of input */
|
EolType eol_type; /* EOL type of input */
|
||||||
int file_encoding; /* file or remote side's character encoding */
|
int file_encoding; /* file or remote side's character encoding */
|
||||||
bool need_transcoding; /* file encoding diff from server? */
|
bool need_transcoding; /* file encoding diff from server? */
|
||||||
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
|
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
|
||||||
|
|
||||||
/* parameters from the COPY command */
|
/* parameters from the COPY command */
|
||||||
Relation rel; /* relation to copy to or from */
|
Relation rel; /* relation to copy to or from */
|
||||||
|
@ -139,22 +139,22 @@ typedef struct CopyStateData
|
||||||
bool header_line; /* CSV header line? */
|
bool header_line; /* CSV header line? */
|
||||||
char *null_print; /* NULL marker string (server encoding!) */
|
char *null_print; /* NULL marker string (server encoding!) */
|
||||||
int null_print_len; /* length of same */
|
int null_print_len; /* length of same */
|
||||||
char *null_print_client; /* same converted to file encoding */
|
char *null_print_client; /* same converted to file encoding */
|
||||||
char *delim; /* column delimiter (must be 1 byte) */
|
char *delim; /* column delimiter (must be 1 byte) */
|
||||||
char *quote; /* CSV quote char (must be 1 byte) */
|
char *quote; /* CSV quote char (must be 1 byte) */
|
||||||
char *escape; /* CSV escape char (must be 1 byte) */
|
char *escape; /* CSV escape char (must be 1 byte) */
|
||||||
List *force_quote; /* list of column names */
|
List *force_quote; /* list of column names */
|
||||||
bool force_quote_all; /* FORCE QUOTE *? */
|
bool force_quote_all; /* FORCE QUOTE *? */
|
||||||
bool *force_quote_flags; /* per-column CSV FQ flags */
|
bool *force_quote_flags; /* per-column CSV FQ flags */
|
||||||
List *force_notnull; /* list of column names */
|
List *force_notnull; /* list of column names */
|
||||||
bool *force_notnull_flags; /* per-column CSV FNN flags */
|
bool *force_notnull_flags; /* per-column CSV FNN flags */
|
||||||
#if PG_VERSION_NUM >= 90400
|
#if PG_VERSION_NUM >= 90400
|
||||||
List *force_null; /* list of column names */
|
List *force_null; /* list of column names */
|
||||||
bool *force_null_flags; /* per-column CSV FN flags */
|
bool *force_null_flags; /* per-column CSV FN flags */
|
||||||
#endif
|
#endif
|
||||||
bool convert_selectively; /* do selective binary conversion? */
|
bool convert_selectively; /* do selective binary conversion? */
|
||||||
List *convert_select; /* list of column names (can be NIL) */
|
List *convert_select; /* list of column names (can be NIL) */
|
||||||
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
|
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
|
||||||
|
|
||||||
/* these are just for error messages, see CopyFromErrorCallback */
|
/* these are just for error messages, see CopyFromErrorCallback */
|
||||||
const char *cur_relname; /* table name for error messages */
|
const char *cur_relname; /* table name for error messages */
|
||||||
|
@ -210,7 +210,7 @@ typedef struct CopyStateData
|
||||||
* can display it in error messages if appropriate.
|
* can display it in error messages if appropriate.
|
||||||
*/
|
*/
|
||||||
StringInfoData line_buf;
|
StringInfoData line_buf;
|
||||||
bool line_buf_converted; /* converted to server encoding? */
|
bool line_buf_converted; /* converted to server encoding? */
|
||||||
bool line_buf_valid; /* contains the row being processed? */
|
bool line_buf_valid; /* contains the row being processed? */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -239,14 +239,15 @@ typedef struct ShardConnections
|
||||||
static HTAB * CreateShardConnectionHash(void);
|
static HTAB * CreateShardConnectionHash(void);
|
||||||
static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray,
|
static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||||
int shardCount);
|
int shardCount);
|
||||||
static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod);
|
static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char
|
||||||
|
partitionMethod);
|
||||||
static ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
static ShardInterval * FindShardInterval(Datum partitionColumnValue,
|
||||||
ShardInterval **shardIntervalCache,
|
ShardInterval **shardIntervalCache,
|
||||||
int shardCount, char partitionMethod,
|
int shardCount, char partitionMethod,
|
||||||
FmgrInfo *compareFunction,
|
FmgrInfo *compareFunction,
|
||||||
FmgrInfo *hashFunction, bool useBinarySearch);
|
FmgrInfo *hashFunction, bool useBinarySearch);
|
||||||
static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
|
static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
|
||||||
ShardInterval** shardIntervalCache,
|
ShardInterval **shardIntervalCache,
|
||||||
int shardCount,
|
int shardCount,
|
||||||
FmgrInfo *compareFunction);
|
FmgrInfo *compareFunction);
|
||||||
static void OpenShardConnections(CopyStmt *copyStatement,
|
static void OpenShardConnections(CopyStmt *copyStatement,
|
||||||
|
@ -266,7 +267,7 @@ static void ReportCopyError(PGconn *connection, PGresult *result);
|
||||||
* and range-partitioned tables.
|
* and range-partitioned tables.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
|
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
RangeVar *relation = copyStatement->relation;
|
RangeVar *relation = copyStatement->relation;
|
||||||
Oid tableId = RangeVarGetRelid(relation, NoLock, false);
|
Oid tableId = RangeVarGetRelid(relation, NoLock, false);
|
||||||
|
@ -319,7 +320,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
|
||||||
if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH)
|
if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("COPY is only supported for hash- and range-partitioned tables")));
|
errmsg(
|
||||||
|
"COPY is only supported for hash- and range-partitioned tables")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* resolve hash function for partition column */
|
/* resolve hash function for partition column */
|
||||||
|
@ -431,7 +433,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
|
||||||
ShardConnections *shardConnections = NULL;
|
ShardConnections *shardConnections = NULL;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
StringInfo lineBuf = NULL;
|
StringInfo lineBuf = NULL;
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
|
|
||||||
oldContext = MemoryContextSwitchTo(tupleContext);
|
oldContext = MemoryContextSwitchTo(tupleContext);
|
||||||
|
|
||||||
|
@ -450,14 +452,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
|
||||||
|
|
||||||
/* find the partition column value */
|
/* find the partition column value */
|
||||||
|
|
||||||
if (columnNulls[partitionColumn->varattno-1])
|
if (columnNulls[partitionColumn->varattno - 1])
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
errmsg("cannot copy row with NULL value "
|
errmsg("cannot copy row with NULL value "
|
||||||
"in partition column")));
|
"in partition column")));
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionColumnValue = columnValues[partitionColumn->varattno-1];
|
partitionColumnValue = columnValues[partitionColumn->varattno - 1];
|
||||||
|
|
||||||
/* find the shard interval and id for the partition column value */
|
/* find the shard interval and id for the partition column value */
|
||||||
shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache,
|
shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache,
|
||||||
|
@ -649,7 +651,7 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
uint32 hashTokenIncrement = (uint32) (HASH_TOKEN_COUNT / shardCount);
|
uint32 hashTokenIncrement = (uint32) (HASH_TOKEN_COUNT / shardCount);
|
||||||
int shardHashCode = ((uint32) (hashedValue-INT32_MIN)/hashTokenIncrement);
|
int shardHashCode = ((uint32) (hashedValue - INT32_MIN) / hashTokenIncrement);
|
||||||
|
|
||||||
shardInterval = shardIntervalCache[shardHashCode];
|
shardInterval = shardIntervalCache[shardHashCode];
|
||||||
}
|
}
|
||||||
|
@ -670,7 +672,7 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
|
||||||
* given partition column value and returns it.
|
* given partition column value and returns it.
|
||||||
*/
|
*/
|
||||||
static ShardInterval *
|
static ShardInterval *
|
||||||
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval** shardIntervalCache,
|
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
|
||||||
int shardCount, FmgrInfo *compareFunction)
|
int shardCount, FmgrInfo *compareFunction)
|
||||||
{
|
{
|
||||||
int lowerBoundIndex = 0;
|
int lowerBoundIndex = 0;
|
||||||
|
@ -682,17 +684,18 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval** shardInter
|
||||||
if (DatumGetInt32(FunctionCall2Coll(compareFunction,
|
if (DatumGetInt32(FunctionCall2Coll(compareFunction,
|
||||||
DEFAULT_COLLATION_OID,
|
DEFAULT_COLLATION_OID,
|
||||||
partitionColumnValue,
|
partitionColumnValue,
|
||||||
shardIntervalCache[middleIndex]->minValue)) < 0)
|
shardIntervalCache[middleIndex]->minValue)) <
|
||||||
|
0)
|
||||||
{
|
{
|
||||||
upperBoundIndex = middleIndex;
|
upperBoundIndex = middleIndex;
|
||||||
}
|
}
|
||||||
else if (DatumGetInt32(FunctionCall2Coll(compareFunction,
|
else if (DatumGetInt32(FunctionCall2Coll(compareFunction,
|
||||||
DEFAULT_COLLATION_OID,
|
DEFAULT_COLLATION_OID,
|
||||||
partitionColumnValue,
|
partitionColumnValue,
|
||||||
shardIntervalCache[middleIndex]->maxValue)) <= 0)
|
shardIntervalCache[middleIndex]->maxValue))
|
||||||
|
<= 0)
|
||||||
{
|
{
|
||||||
return shardIntervalCache[middleIndex];
|
return shardIntervalCache[middleIndex];
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -848,8 +851,9 @@ AppendCopyOptions(StringInfo command, List *copyOptionList)
|
||||||
{
|
{
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
errmsg("argument to option \"%s\" must be a list of column names",
|
errmsg(
|
||||||
option->defname)));
|
"argument to option \"%s\" must be a list of column names",
|
||||||
|
option->defname)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -909,8 +913,8 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
|
||||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
errmsg("Failed to COPY to shard %ld on %s:%s",
|
errmsg("Failed to COPY to shard %ld on %s:%s",
|
||||||
shardId, nodeName, nodePort)));
|
shardId, nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -924,7 +928,7 @@ ConnectionList(HTAB *connectionHash)
|
||||||
{
|
{
|
||||||
List *connectionList = NIL;
|
List *connectionList = NIL;
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
ShardConnections* shardConnections = NULL;
|
ShardConnections *shardConnections = NULL;
|
||||||
|
|
||||||
hash_seq_init(&status, connectionHash);
|
hash_seq_init(&status, connectionHash);
|
||||||
while ((shardConnections = (ShardConnections *) hash_seq_search(&status)) != NULL)
|
while ((shardConnections = (ShardConnections *) hash_seq_search(&status)) != NULL)
|
||||||
|
|
|
@ -69,7 +69,7 @@ static const struct config_enum_entry shard_placement_policy_options[] = {
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
static const struct config_enum_entry transaction_manager_options[] = {
|
static const struct config_enum_entry transaction_manager_options[] = {
|
||||||
{ "1pc", TRANSACTION_MANAGER_1PC, false },
|
{ "1pc", TRANSACTION_MANAGER_1PC, false },
|
||||||
{ "2pc", TRANSACTION_MANAGER_2PC, false },
|
{ "2pc", TRANSACTION_MANAGER_2PC, false },
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
|
|
|
@ -97,7 +97,7 @@ AbortTransactions(List *connectionList)
|
||||||
ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'",
|
ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'",
|
||||||
transactionName->data),
|
transactionName->data),
|
||||||
errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s",
|
errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s",
|
||||||
transactionName->data, nodeName, nodePort)));
|
transactionName->data, nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
@ -150,7 +150,7 @@ CommitTransactions(List *connectionList)
|
||||||
ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'",
|
ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'",
|
||||||
transactionName->data),
|
transactionName->data),
|
||||||
errhint("Run COMMIT TRANSACTION '%s' on %s:%s",
|
errhint("Run COMMIT TRANSACTION '%s' on %s:%s",
|
||||||
transactionName->data, nodeName, nodePort)));
|
transactionName->data, nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -183,7 +183,7 @@ BuildTransactionName(int connectionId)
|
||||||
StringInfo commandString = makeStringInfo();
|
StringInfo commandString = makeStringInfo();
|
||||||
|
|
||||||
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
|
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
|
||||||
GetCurrentTransactionId(), connectionId);
|
GetCurrentTransactionId(), connectionId);
|
||||||
|
|
||||||
return commandString;
|
return commandString;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ extern int CopyTransactionManager;
|
||||||
|
|
||||||
|
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char* completionTag);
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_COPY_H */
|
#endif /* MULTI_COPY_H */
|
||||||
|
|
|
@ -43,7 +43,7 @@ typedef struct TransactionConnection
|
||||||
{
|
{
|
||||||
int64 connectionId;
|
int64 connectionId;
|
||||||
TransactionState transactionState;
|
TransactionState transactionState;
|
||||||
PGconn* connection;
|
PGconn *connection;
|
||||||
} TransactionConnection;
|
} TransactionConnection;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue