diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 701293bbe..54d6baf71 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -119,12 +119,12 @@ typedef struct CopyStateData CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for - * dest == COPY_NEW_FE in COPY FROM */ + * dest == COPY_NEW_FE in COPY FROM */ bool fe_eof; /* true if detected end of copy data */ EolType eol_type; /* EOL type of input */ int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ @@ -139,22 +139,22 @@ typedef struct CopyStateData bool header_line; /* CSV header line? */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ + char *null_print_client; /* same converted to file encoding */ char *delim; /* column delimiter (must be 1 byte) */ char *quote; /* CSV quote char (must be 1 byte) */ char *escape; /* CSV escape char (must be 1 byte) */ List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ + bool force_quote_all; /* FORCE QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ List *force_notnull; /* list of column names */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ #if PG_VERSION_NUM >= 90400 List *force_null; /* list of column names */ - bool *force_null_flags; /* per-column CSV FN flags */ + bool *force_null_flags; /* per-column CSV FN flags */ #endif - bool convert_selectively; /* do selective binary conversion? */ + bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -210,7 +210,7 @@ typedef struct CopyStateData * can display it in error messages if appropriate. */ StringInfoData line_buf; - bool line_buf_converted; /* converted to server encoding? */ + bool line_buf_converted; /* converted to server encoding? */ bool line_buf_valid; /* contains the row being processed? */ /* @@ -239,14 +239,15 @@ typedef struct ShardConnections static HTAB * CreateShardConnectionHash(void); static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount); -static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod); +static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char + partitionMethod); static ShardInterval * FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, char partitionMethod, FmgrInfo *compareFunction, FmgrInfo *hashFunction, bool useBinarySearch); static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval** shardIntervalCache, + ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction); static void OpenShardConnections(CopyStmt *copyStatement, @@ -266,7 +267,7 @@ static void ReportCopyError(PGconn *connection, PGresult *result); * and range-partitioned tables. */ void -CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) +CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) { RangeVar *relation = copyStatement->relation; Oid tableId = RangeVarGetRelid(relation, NoLock, false); @@ -319,7 +320,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY is only supported for hash- and range-partitioned tables"))); + errmsg( + "COPY is only supported for hash- and range-partitioned tables"))); } /* resolve hash function for partition column */ @@ -431,7 +433,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) ShardConnections *shardConnections = NULL; bool found = false; StringInfo lineBuf = NULL; - MemoryContext oldContext = NULL; + MemoryContext oldContext = NULL; oldContext = MemoryContextSwitchTo(tupleContext); @@ -450,14 +452,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) /* find the partition column value */ - if (columnNulls[partitionColumn->varattno-1]) + if (columnNulls[partitionColumn->varattno - 1]) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("cannot copy row with NULL value " "in partition column"))); } - partitionColumnValue = columnValues[partitionColumn->varattno-1]; + partitionColumnValue = columnValues[partitionColumn->varattno - 1]; /* find the shard interval and id for the partition column value */ shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache, @@ -649,7 +651,7 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache else { uint32 hashTokenIncrement = (uint32) (HASH_TOKEN_COUNT / shardCount); - int shardHashCode = ((uint32) (hashedValue-INT32_MIN)/hashTokenIncrement); + int shardHashCode = ((uint32) (hashedValue - INT32_MIN) / hashTokenIncrement); shardInterval = shardIntervalCache[shardHashCode]; } @@ -670,7 +672,7 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache * given partition column value and returns it. */ static ShardInterval * -SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval** shardIntervalCache, +SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction) { int lowerBoundIndex = 0; @@ -682,17 +684,18 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval** shardInter if (DatumGetInt32(FunctionCall2Coll(compareFunction, DEFAULT_COLLATION_OID, partitionColumnValue, - shardIntervalCache[middleIndex]->minValue)) < 0) + shardIntervalCache[middleIndex]->minValue)) < + 0) { upperBoundIndex = middleIndex; } else if (DatumGetInt32(FunctionCall2Coll(compareFunction, DEFAULT_COLLATION_OID, partitionColumnValue, - shardIntervalCache[middleIndex]->maxValue)) <= 0) + shardIntervalCache[middleIndex]->maxValue)) + <= 0) { return shardIntervalCache[middleIndex]; - } else { @@ -848,8 +851,9 @@ AppendCopyOptions(StringInfo command, List *copyOptionList) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("argument to option \"%s\" must be a list of column names", - option->defname))); + errmsg( + "argument to option \"%s\" must be a list of column names", + option->defname))); } else { @@ -909,8 +913,8 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) char *nodeName = ConnectionGetOptionValue(connection, "host"); char *nodePort = ConnectionGetOptionValue(connection, "port"); ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to COPY to shard %ld on %s:%s", - shardId, nodeName, nodePort))); + errmsg("Failed to COPY to shard %ld on %s:%s", + shardId, nodeName, nodePort))); } } } @@ -924,7 +928,7 @@ ConnectionList(HTAB *connectionHash) { List *connectionList = NIL; HASH_SEQ_STATUS status; - ShardConnections* shardConnections = NULL; + ShardConnections *shardConnections = NULL; hash_seq_init(&status, connectionHash); while ((shardConnections = (ShardConnections *) hash_seq_search(&status)) != NULL) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index b941b78fd..526a5d838 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -69,7 +69,7 @@ static const struct config_enum_entry shard_placement_policy_options[] = { { NULL, 0, false } }; -static const struct config_enum_entry transaction_manager_options[] = { +static const struct config_enum_entry transaction_manager_options[] = { { "1pc", TRANSACTION_MANAGER_1PC, false }, { "2pc", TRANSACTION_MANAGER_2PC, false }, { NULL, 0, false } diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index b359084da..58c3ed15c 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -97,7 +97,7 @@ AbortTransactions(List *connectionList) ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'", transactionName->data), errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s", - transactionName->data, nodeName, nodePort))); + transactionName->data, nodeName, nodePort))); } PQclear(result); @@ -150,7 +150,7 @@ CommitTransactions(List *connectionList) ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'", transactionName->data), errhint("Run COMMIT TRANSACTION '%s' on %s:%s", - transactionName->data, nodeName, nodePort))); + transactionName->data, nodeName, nodePort))); } } else @@ -183,7 +183,7 @@ BuildTransactionName(int connectionId) StringInfo commandString = makeStringInfo(); appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid, - GetCurrentTransactionId(), connectionId); + GetCurrentTransactionId(), connectionId); return commandString; } diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 2c754e739..279b8c165 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -21,7 +21,7 @@ extern int CopyTransactionManager; /* function declarations for copying into a distributed table */ -extern void CitusCopyFrom(CopyStmt *copyStatement, char* completionTag); +extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); #endif /* MULTI_COPY_H */ diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/multi_transaction.h index e3fe74ad8..827f0fcaa 100644 --- a/src/include/distributed/multi_transaction.h +++ b/src/include/distributed/multi_transaction.h @@ -43,7 +43,7 @@ typedef struct TransactionConnection { int64 connectionId; TransactionState transactionState; - PGconn* connection; + PGconn *connection; } TransactionConnection;