mirror of https://github.com/citusdata/citus.git
Add COPY support on master node for append partitioned relations
parent
991f73c4f4
commit
132a77f992
|
@ -7,22 +7,28 @@
|
|||
* The CitusCopyFrom function should be called from the utility hook to
|
||||
* process COPY ... FROM commands on distributed tables. CitusCopyFrom
|
||||
* parses the input from stdin, a program executed on the master, or a file
|
||||
* on the master, and decides into which shard to put the data. It opens a
|
||||
* new connection for every shard placement and uses the PQputCopyData
|
||||
* function to copy the data. Because PQputCopyData transmits data,
|
||||
* asynchronously, the workers will ingest data at least partially in
|
||||
* parallel.
|
||||
* on the master, and decides to copy new rows to existing shards or new shards
|
||||
* based on the partition method of the distributed table.
|
||||
*
|
||||
* It opens a new connection for every shard placement and uses the PQputCopyData
|
||||
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
|
||||
* the workers will ingest data at least partially in parallel.
|
||||
*
|
||||
* When failing to connect to a worker, the master marks the placement for
|
||||
* which it was trying to open a connection as inactive, similar to the way
|
||||
* DML statements are handled. If a failure occurs after connecting, the
|
||||
* transaction is rolled back on all the workers.
|
||||
* transaction is rolled back on all the workers. Note that, if the underlying
|
||||
* table is append-partitioned, metadata changes are rolled back on the master
|
||||
* node, but shard placements are left on the workers.
|
||||
*
|
||||
* By default, COPY uses normal transactions on the workers. This can cause
|
||||
* a problem when some of the transactions fail to commit while others have
|
||||
* succeeded. To ensure no data is lost, COPY can use two-phase commit, by
|
||||
* increasing max_prepared_transactions on the worker and setting
|
||||
* citus.copy_transaction_manager to '2pc'. The default is '1pc'.
|
||||
* By default, COPY uses normal transactions on the workers. In the case of
|
||||
* hash or range-partitioned tables, this can cause a problem when some of the
|
||||
* transactions fail to commit while others have succeeded. To ensure no data
|
||||
* is lost, COPY can use two-phase commit, by increasing max_prepared_transactions
|
||||
* on the worker and setting citus.copy_transaction_manager to '2pc'. The default
|
||||
* is '1pc'. This is not a problem for append-partitioned tables because new
|
||||
* shards are created and in the case of failure, metadata changes are rolled
|
||||
* back on the master node.
|
||||
*
|
||||
* Parsing options are processed and enforced on the master, while
|
||||
* constraints are enforced on the worker. In either case, failure causes
|
||||
|
@ -64,6 +70,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_transaction.h"
|
||||
|
@ -132,6 +139,8 @@ typedef struct ShardConnections
|
|||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
|
||||
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag);
|
||||
static void LockAllShards(List *shardIntervalList);
|
||||
static HTAB * CreateShardConnectionHash(void);
|
||||
static int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
||||
|
@ -153,6 +162,8 @@ static ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
|
|||
bool *shardConnectionsFound);
|
||||
static void OpenCopyTransactions(CopyStmt *copyStatement,
|
||||
ShardConnections *shardConnections);
|
||||
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
|
||||
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
|
||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
||||
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
|
||||
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
|
||||
|
@ -161,6 +172,9 @@ static List * ConnectionList(HTAB *connectionHash);
|
|||
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
|
||||
static void ReportCopyError(PGconn *connection, PGresult *result);
|
||||
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
|
||||
static void StartCopyToNewShard(ShardConnections *shardConnections,
|
||||
Oid relationId, CopyStmt *copyStatement);
|
||||
static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
|
||||
|
||||
/* Private functions copied and adapted from copy.c in PostgreSQL */
|
||||
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
|
||||
|
@ -173,17 +187,65 @@ static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *
|
|||
|
||||
|
||||
/*
|
||||
* CitusCopyFrom implements the COPY table_name FROM ... for hash-partitioned
|
||||
* and range-partitioned tables.
|
||||
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
|
||||
* statement to related subfunctions based on the partition method of the
|
||||
* distributed table.
|
||||
*/
|
||||
void
|
||||
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||
{
|
||||
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||
char partitionMethod = '\0';
|
||||
|
||||
/* disallow COPY to/from file or program except for superusers */
|
||||
if (copyStatement->filename != NULL && !superuser())
|
||||
{
|
||||
if (copyStatement->is_program)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from an external program"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from a file"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
}
|
||||
}
|
||||
|
||||
partitionMethod = PartitionMethod(tableId);
|
||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
CopyToExistingShards(copyStatement, completionTag);
|
||||
}
|
||||
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
CopyToNewShards(copyStatement, completionTag);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("unsupported partition method")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyToExistingShards implements the COPY table_name FROM ... for hash or
|
||||
* range-partitioned tables where there are already shards into which to copy
|
||||
* rows.
|
||||
*/
|
||||
static void
|
||||
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||
{
|
||||
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||
char *relationName = get_rel_name(tableId);
|
||||
Relation distributedRelation = NULL;
|
||||
char partitionMethod = '\0';
|
||||
Var *partitionColumn = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
uint32 columnCount = 0;
|
||||
Datum *columnValues = NULL;
|
||||
|
@ -210,35 +272,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
/* disallow COPY to/from file or program except for superusers */
|
||||
if (copyStatement->filename != NULL && !superuser())
|
||||
{
|
||||
if (copyStatement->is_program)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from an external program"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from a file"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
}
|
||||
}
|
||||
|
||||
partitionColumn = PartitionColumn(tableId, 0);
|
||||
partitionMethod = PartitionMethod(tableId);
|
||||
if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("COPY is only supported for hash- and "
|
||||
"range-partitioned tables")));
|
||||
}
|
||||
Var *partitionColumn = PartitionColumn(tableId, 0);
|
||||
char partitionMethod = PartitionMethod(tableId);
|
||||
|
||||
/* resolve hash function for partition column */
|
||||
typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO);
|
||||
|
@ -310,7 +345,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
|
||||
/*
|
||||
* Create a mapping of shard id to a connection for each of its placements.
|
||||
* The hash should be initialized before the PG_TRY, since it is used and
|
||||
* The hash should be initialized before the PG_TRY, since it is used in
|
||||
* PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
|
||||
* documentation).
|
||||
*/
|
||||
|
@ -391,11 +426,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
/* open connections and initiate COPY on shard placements */
|
||||
OpenCopyTransactions(copyStatement, shardConnections);
|
||||
|
||||
/* send binary headers to shard placements */
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyBinaryHeaders(copyOutState);
|
||||
SendCopyDataToAll(copyOutState->fe_msgbuf,
|
||||
shardConnections->connectionList);
|
||||
/* send copy binary headers to shard placements */
|
||||
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
|
||||
}
|
||||
|
||||
/* replicate row to shard placements */
|
||||
|
@ -409,10 +441,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
|
||||
connectionList = ConnectionList(shardConnectionHash);
|
||||
|
||||
/* send binary footers to all shard placements */
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyBinaryFooters(copyOutState);
|
||||
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
|
||||
/* send copy binary footers to all shard placements */
|
||||
SendCopyBinaryFooters(copyOutState, connectionList);
|
||||
|
||||
/* all lines have been copied, stop showing line number in errors */
|
||||
error_context_stack = errorCallback.previous;
|
||||
|
@ -450,7 +480,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
* we do not want any of the transactions rolled back if a failure occurs. Instead,
|
||||
* they should be rolled forward.
|
||||
*/
|
||||
CommitRemoteTransactions(connectionList);
|
||||
CommitRemoteTransactions(connectionList, false);
|
||||
CloseConnections(connectionList);
|
||||
|
||||
if (completionTag != NULL)
|
||||
|
@ -461,6 +491,171 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
|
||||
* tables where we create new shards into which to copy rows.
|
||||
*/
|
||||
static void
|
||||
CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
||||
{
|
||||
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
|
||||
/* allocate column values and nulls arrays */
|
||||
Relation distributedRelation = heap_open(relationId, RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
|
||||
bool *columnNulls = palloc0(columnCount * sizeof(bool));
|
||||
|
||||
EState *executorState = CreateExecutorState();
|
||||
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||
|
||||
/*
|
||||
* Shard connections should be initialized before the PG_TRY, since it is
|
||||
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
|
||||
* (see sigsetjmp documentation).
|
||||
*/
|
||||
ShardConnections *shardConnections =
|
||||
(ShardConnections *) palloc0(sizeof(ShardConnections));
|
||||
|
||||
/* initialize copy state to read from COPY data source */
|
||||
CopyState copyState = BeginCopyFrom(distributedRelation,
|
||||
copyStatement->filename,
|
||||
copyStatement->is_program,
|
||||
copyStatement->attlist,
|
||||
copyStatement->options);
|
||||
|
||||
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->binary = true;
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->rowcontext = executorTupleContext;
|
||||
|
||||
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
|
||||
|
||||
/* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
|
||||
PG_TRY();
|
||||
{
|
||||
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
|
||||
uint64 copiedDataSizeInBytes = 0;
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
/* set up callback to identify error line number */
|
||||
ErrorContextCallback errorCallback;
|
||||
|
||||
errorCallback.callback = CopyFromErrorCallback;
|
||||
errorCallback.arg = (void *) copyState;
|
||||
errorCallback.previous = error_context_stack;
|
||||
|
||||
while (true)
|
||||
{
|
||||
bool nextRowFound = false;
|
||||
MemoryContext oldContext = NULL;
|
||||
uint64 messageBufferSize = 0;
|
||||
|
||||
ResetPerTupleExprContext(executorState);
|
||||
|
||||
/* switch to tuple memory context and start showing line number in errors */
|
||||
error_context_stack = &errorCallback;
|
||||
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||
|
||||
/* parse a row from the input */
|
||||
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
||||
columnValues, columnNulls, NULL);
|
||||
|
||||
if (!nextRowFound)
|
||||
{
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
break;
|
||||
}
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* switch to regular memory context and stop showing line number in errors */
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
error_context_stack = errorCallback.previous;
|
||||
|
||||
/*
|
||||
* If copied data size is zero, this means either this is the first
|
||||
* line in the copy or we just filled the previous shard up to its
|
||||
* capacity. Either way, we need to create a new shard and
|
||||
* start copying new rows into it.
|
||||
*/
|
||||
if (copiedDataSizeInBytes == 0)
|
||||
{
|
||||
/* create shard and open connections to shard placements */
|
||||
StartCopyToNewShard(shardConnections, relationId, copyStatement);
|
||||
|
||||
/* send copy binary headers to shard placements */
|
||||
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
|
||||
}
|
||||
|
||||
/* replicate row to shard placements */
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||
copyOutState, columnOutputFunctions);
|
||||
SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList);
|
||||
|
||||
messageBufferSize = copyOutState->fe_msgbuf->len;
|
||||
copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize;
|
||||
|
||||
/*
|
||||
* If we filled up this shard to its capacity, send copy binary footers
|
||||
* to shard placements, commit copy transactions, close connections
|
||||
* and finally update shard statistics.
|
||||
*
|
||||
* */
|
||||
if (copiedDataSizeInBytes > shardMaxSizeInBytes)
|
||||
{
|
||||
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
|
||||
FinalizeCopyToNewShard(shardConnections);
|
||||
UpdateShardStatistics(relationId, shardConnections->shardId);
|
||||
|
||||
copiedDataSizeInBytes = 0;
|
||||
}
|
||||
|
||||
processedRowCount += 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* For the last shard, send copy binary footers to shard placements,
|
||||
* commit copy transactions, close connections and finally update shard
|
||||
* statistics. If no row is send, there is no shard to finalize the
|
||||
* copy command.
|
||||
*/
|
||||
if (copiedDataSizeInBytes > 0)
|
||||
{
|
||||
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
|
||||
FinalizeCopyToNewShard(shardConnections);
|
||||
UpdateShardStatistics(relationId, shardConnections->shardId);
|
||||
}
|
||||
|
||||
EndCopyFrom(copyState);
|
||||
heap_close(distributedRelation, NoLock);
|
||||
|
||||
/* check for cancellation one last time before returning */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (completionTag != NULL)
|
||||
{
|
||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, processedRowCount);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* roll back all transactions */
|
||||
EndRemoteCopy(shardConnections->connectionList, false);
|
||||
AbortRemoteTransactions(shardConnections->connectionList);
|
||||
CloseConnections(shardConnections->connectionList);
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockAllShards takes shared locks on the metadata and the data of all shards in
|
||||
* shardIntervalList. This prevents concurrent placement changes and concurrent
|
||||
|
@ -826,6 +1021,26 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
}
|
||||
|
||||
|
||||
/* Send copy binary headers to given connections */
|
||||
static void
|
||||
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList)
|
||||
{
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyBinaryHeaders(copyOutState);
|
||||
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
|
||||
}
|
||||
|
||||
|
||||
/* Send copy binary footers to given connections */
|
||||
static void
|
||||
SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
|
||||
{
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyBinaryFooters(copyOutState);
|
||||
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConstructCopyStatement constructs the text of a COPY statement for a particular
|
||||
* shard.
|
||||
|
@ -1060,13 +1275,12 @@ void
|
|||
AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions)
|
||||
{
|
||||
MemoryContext oldContext = NULL;
|
||||
uint32 totalColumnCount = (uint32) rowDescriptor->natts;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 availableColumnCount = AvailableColumnCount(rowDescriptor);
|
||||
uint32 appendedColumnCount = 0;
|
||||
uint32 columnIndex = 0;
|
||||
|
||||
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
|
@ -1170,6 +1384,7 @@ void
|
|||
AppendCopyBinaryHeaders(CopyOutState headerOutputState)
|
||||
{
|
||||
const int32 zero = 0;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(headerOutputState->rowcontext);
|
||||
|
||||
/* Signature */
|
||||
CopySendData(headerOutputState, BinarySignature, 11);
|
||||
|
@ -1179,6 +1394,8 @@ AppendCopyBinaryHeaders(CopyOutState headerOutputState)
|
|||
|
||||
/* No header extension */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1190,8 +1407,52 @@ void
|
|||
AppendCopyBinaryFooters(CopyOutState footerOutputState)
|
||||
{
|
||||
int16 negative = -1;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(footerOutputState->rowcontext);
|
||||
|
||||
CopySendInt16(footerOutputState, negative);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartCopyToNewShard creates a new shard and related shard placements and
|
||||
* opens connections to shard placements.
|
||||
*/
|
||||
static void
|
||||
StartCopyToNewShard(ShardConnections *shardConnections, Oid relationId,
|
||||
CopyStmt *copyStatement)
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
text *relationNameText = cstring_to_text(relationName);
|
||||
Datum relationNameDatum = PointerGetDatum(relationNameText);
|
||||
Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard,
|
||||
relationNameDatum);
|
||||
|
||||
int64 shardId = DatumGetInt64(shardIdDatum);
|
||||
shardConnections->shardId = shardId;
|
||||
|
||||
list_free_deep(shardConnections->connectionList);
|
||||
shardConnections->connectionList = NIL;
|
||||
|
||||
/* connect to shards placements and start transactions */
|
||||
OpenCopyTransactions(copyStatement, shardConnections);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinalizeCopyToNewShard commits copy transaction and closes connections to
|
||||
* shard placements.
|
||||
*/
|
||||
static void
|
||||
FinalizeCopyToNewShard(ShardConnections *shardConnections)
|
||||
{
|
||||
/* close the COPY input on all shard placements */
|
||||
EndRemoteCopy(shardConnections->connectionList, true);
|
||||
|
||||
/* commit transactions and close connections */
|
||||
CommitRemoteTransactions(shardConnections->connectionList, true);
|
||||
CloseConnections(shardConnections->connectionList);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -211,7 +211,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
|
|||
* We mark shard placements that we couldn't drop as to be deleted later, but
|
||||
* we do delete the shard metadadata.
|
||||
*/
|
||||
int
|
||||
static int
|
||||
DropShards(Oid relationId, char *schemaName, char *relationName,
|
||||
List *deletableShardIntervalList)
|
||||
{
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
static bool WorkerCreateShard(char *nodeName, uint32 nodePort,
|
||||
uint64 shardId, List *ddlCommandList);
|
||||
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
|
||||
char *shardName, uint64 *shardLength,
|
||||
char *shardName, uint64 *shardSize,
|
||||
text **shardMinValue, text **shardMaxValue);
|
||||
static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId,
|
||||
char *tableName);
|
||||
|
@ -159,14 +159,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
List *succeededPlacementList = NIL;
|
||||
List *failedPlacementList = NIL;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
ListCell *succeededPlacementCell = NULL;
|
||||
ListCell *failedPlacementCell = NULL;
|
||||
bool statsOK = false;
|
||||
uint64 newShardLength = 0;
|
||||
uint64 newShardSize = 0;
|
||||
uint64 shardMaxSizeInBytes = 0;
|
||||
float4 shardFillLevel = 0.0;
|
||||
text *newMinValue = NULL;
|
||||
text *newMaxValue = NULL;
|
||||
char partitionMethod = 0;
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
|
@ -264,64 +260,12 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
|
||||
RESUME_INTERRUPTS();
|
||||
|
||||
/* get appended shard's statistics from a shard placement */
|
||||
foreach(succeededPlacementCell, succeededPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(succeededPlacementCell);
|
||||
char *workerName = placement->nodeName;
|
||||
uint32 workerPort = placement->nodePort;
|
||||
|
||||
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName,
|
||||
&newShardLength, &newMinValue, &newMaxValue);
|
||||
if (statsOK)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If for some reason we appended data to a shard, but failed to retrieve
|
||||
* statistics we just WARN here to avoid losing shard-state updates. Note
|
||||
* that this means we will return 0 as the shard fill-factor, and this shard
|
||||
* also won't be pruned as the statistics will be empty. If the failure was
|
||||
* transient, a subsequent append call will fetch the correct statistics.
|
||||
*/
|
||||
if (!statsOK)
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not get statistics for shard placement"),
|
||||
errdetail("Setting shard statistics to NULL")));
|
||||
}
|
||||
|
||||
/* make sure we don't process cancel signals */
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
/* update metadata for each shard placement we appended to */
|
||||
succeededPlacementCell = NULL;
|
||||
foreach(succeededPlacementCell, succeededPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(succeededPlacementCell);
|
||||
char *workerName = placement->nodeName;
|
||||
uint32 workerPort = placement->nodePort;
|
||||
|
||||
DeleteShardPlacementRow(shardId, workerName, workerPort);
|
||||
InsertShardPlacementRow(shardId, FILE_FINALIZED, newShardLength,
|
||||
workerName, workerPort);
|
||||
}
|
||||
|
||||
DeleteShardRow(shardId);
|
||||
InsertShardRow(relationId, shardId, storageType, newMinValue, newMaxValue);
|
||||
|
||||
if (QueryCancelPending)
|
||||
{
|
||||
ereport(WARNING, (errmsg("cancel requests are ignored during table appends")));
|
||||
QueryCancelPending = false;
|
||||
}
|
||||
|
||||
RESUME_INTERRUPTS();
|
||||
/* update shard statistics and get new shard size */
|
||||
newShardSize = UpdateShardStatistics(relationId, shardId);
|
||||
|
||||
/* calculate ratio of current shard size compared to shard max size */
|
||||
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
|
||||
shardFillLevel = ((float4) newShardLength / (float4) shardMaxSizeInBytes);
|
||||
shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes);
|
||||
|
||||
PG_RETURN_FLOAT4(shardFillLevel);
|
||||
}
|
||||
|
@ -446,13 +390,99 @@ WorkerCreateShard(char *nodeName, uint32 nodePort,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateShardStatistics updates metadata for the given shard id and returns
|
||||
* the new shard size.
|
||||
*/
|
||||
uint64
|
||||
UpdateShardStatistics(Oid relationId, int64 shardId)
|
||||
{
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
char storageType = shardInterval->storageType;
|
||||
char *shardName = NULL;
|
||||
List *shardPlacementList = NIL;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
bool statsOK = false;
|
||||
uint64 shardSize = 0;
|
||||
text *minValue = NULL;
|
||||
text *maxValue = NULL;
|
||||
|
||||
/* if shard doesn't have an alias, extend regular table name */
|
||||
shardName = LoadShardAlias(relationId, shardId);
|
||||
if (shardName == NULL)
|
||||
{
|
||||
shardName = get_rel_name(relationId);
|
||||
AppendShardIdToName(&shardName, shardId);
|
||||
}
|
||||
|
||||
shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||
|
||||
/* get shard's statistics from a shard placement */
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
char *workerName = placement->nodeName;
|
||||
uint32 workerPort = placement->nodePort;
|
||||
|
||||
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName,
|
||||
&shardSize, &minValue, &maxValue);
|
||||
if (statsOK)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If for some reason we appended data to a shard, but failed to retrieve
|
||||
* statistics we just WARN here to avoid losing shard-state updates. Note
|
||||
* that this means we will return 0 as the shard fill-factor, and this shard
|
||||
* also won't be pruned as the statistics will be empty. If the failure was
|
||||
* transient, a subsequent append call will fetch the correct statistics.
|
||||
*/
|
||||
if (!statsOK)
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not get statistics for shard %s", shardName),
|
||||
errdetail("Setting shard statistics to NULL")));
|
||||
}
|
||||
|
||||
/* make sure we don't process cancel signals */
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
/* update metadata for each shard placement we appended to */
|
||||
shardPlacementCell = NULL;
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
char *workerName = placement->nodeName;
|
||||
uint32 workerPort = placement->nodePort;
|
||||
|
||||
DeleteShardPlacementRow(shardId, workerName, workerPort);
|
||||
InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize,
|
||||
workerName, workerPort);
|
||||
}
|
||||
|
||||
DeleteShardRow(shardId);
|
||||
InsertShardRow(relationId, shardId, storageType, minValue, maxValue);
|
||||
|
||||
if (QueryCancelPending)
|
||||
{
|
||||
ereport(WARNING, (errmsg("cancel requests are ignored during metadata update")));
|
||||
QueryCancelPending = false;
|
||||
}
|
||||
|
||||
RESUME_INTERRUPTS();
|
||||
|
||||
return shardSize;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WorkerShardStats queries the worker node, and retrieves shard statistics that
|
||||
* we assume have changed after new table data have been appended to the shard.
|
||||
*/
|
||||
static bool
|
||||
WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName,
|
||||
uint64 *shardLength, text **shardMinValue, text **shardMaxValue)
|
||||
uint64 *shardSize, text **shardMinValue, text **shardMaxValue)
|
||||
{
|
||||
bool shardStatsOK = true;
|
||||
|
||||
|
@ -464,7 +494,7 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
|
|||
StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId,
|
||||
shardName, SHARD_MAX_VALUE_QUERY);
|
||||
|
||||
(*shardLength) = tableSize;
|
||||
(*shardSize) = tableSize;
|
||||
(*shardMinValue) = cstring_to_text_with_len(minValue->data, minValue->len);
|
||||
(*shardMaxValue) = cstring_to_text_with_len(maxValue->data, maxValue->len);
|
||||
}
|
||||
|
|
|
@ -135,10 +135,15 @@ AbortRemoteTransactions(List *connectionList)
|
|||
|
||||
/*
|
||||
* CommitRemoteTransactions commits all transactions on connections in connectionList.
|
||||
* On failure, it reports a warning and continues committing all of them.
|
||||
* If stopOnFailure is true, then CommitRemoteTransactions reports an error on
|
||||
* failure, otherwise it reports a warning.
|
||||
* Note that if the caller of this function wants the transactions to roll back
|
||||
* on a failing commit, stopOnFailure should be used as true. On the other hand,
|
||||
* if the caller does not want the transactions to roll back on a failing commit,
|
||||
* stopOnFailure should be used as false.
|
||||
*/
|
||||
void
|
||||
CommitRemoteTransactions(List *connectionList)
|
||||
CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
|
@ -166,11 +171,24 @@ CommitRemoteTransactions(List *connectionList)
|
|||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
/* log a warning so the user may commit the transaction later */
|
||||
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
/*
|
||||
* If stopOnFailure is false, log a warning so the user may
|
||||
* commit the transaction later.
|
||||
*/
|
||||
if (stopOnFailure)
|
||||
{
|
||||
ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -178,15 +196,26 @@ CommitRemoteTransactions(List *connectionList)
|
|||
/* we shouldn't be committing if any transactions are not open */
|
||||
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
|
||||
|
||||
/* try to commit, if it fails then the user might lose data */
|
||||
/*
|
||||
* Try to commit, if it fails and stopOnFailure is false then
|
||||
* the user might lose data.
|
||||
*/
|
||||
result = PQexec(connection, "COMMIT");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
if (stopOnFailure)
|
||||
{
|
||||
ereport(ERROR, (errmsg("failed to commit transaction on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,7 @@ extern void CheckDistributedTable(Oid relationId);
|
|||
extern void CreateShardPlacements(int64 shardId, List *ddlEventList,
|
||||
List *workerNodeList, int workerStartIndex,
|
||||
int replicationFactor);
|
||||
extern uint64 UpdateShardStatistics(Oid relationId, int64 shardId);
|
||||
|
||||
/* Function declarations for generating metadata for shard creation */
|
||||
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
||||
|
|
|
@ -51,7 +51,7 @@ typedef struct TransactionConnection
|
|||
extern void InitializeDistributedTransaction(void);
|
||||
extern void PrepareRemoteTransactions(List *connectionList);
|
||||
extern void AbortRemoteTransactions(List *connectionList);
|
||||
extern void CommitRemoteTransactions(List *connectionList);
|
||||
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
|
||||
extern void CloseConnections(List *connectionList);
|
||||
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
--
|
||||
-- MULTI_COPY
|
||||
--
|
||||
|
||||
-- Create a new hash-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_hash (
|
||||
c_custkey integer,
|
||||
|
@ -188,3 +189,68 @@ FROM customer_copy_range WHERE c_custkey <= 500;
|
|||
|
||||
-- Check whether data was copied
|
||||
SELECT count(*) FROM customer_copy_range;
|
||||
|
||||
-- Create a new append-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_append (
|
||||
c_custkey integer,
|
||||
c_name varchar(25) not null,
|
||||
c_address varchar(40),
|
||||
c_nationkey integer,
|
||||
c_phone char(15),
|
||||
c_acctbal decimal(15,2),
|
||||
c_mktsegment char(10),
|
||||
c_comment varchar(117));
|
||||
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
||||
|
||||
-- Test syntax error
|
||||
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
||||
1,customer1
|
||||
2,customer2
|
||||
notinteger,customernot
|
||||
\.
|
||||
|
||||
-- Test that no shard is created for failing copy
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
||||
|
||||
-- Test empty copy
|
||||
COPY customer_copy_append FROM STDIN;
|
||||
\.
|
||||
|
||||
-- Test that no shard is created for copying zero rows
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
||||
|
||||
-- Test proper copy
|
||||
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
||||
1,customer1
|
||||
2,customer2
|
||||
\.
|
||||
|
||||
-- Check whether data was copied properly
|
||||
SELECT * FROM customer_copy_append;
|
||||
|
||||
-- Create lineitem table
|
||||
CREATE TABLE lineitem_copy_append (
|
||||
l_orderkey bigint not null,
|
||||
l_partkey integer not null,
|
||||
l_suppkey integer not null,
|
||||
l_linenumber integer not null,
|
||||
l_quantity decimal(15, 2) not null,
|
||||
l_extendedprice decimal(15, 2) not null,
|
||||
l_discount decimal(15, 2) not null,
|
||||
l_tax decimal(15, 2) not null,
|
||||
l_returnflag char(1) not null,
|
||||
l_linestatus char(1) not null,
|
||||
l_shipdate date not null,
|
||||
l_commitdate date not null,
|
||||
l_receiptdate date not null,
|
||||
l_shipinstruct char(25) not null,
|
||||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null);
|
||||
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
|
||||
|
||||
-- Test multiple shard creation
|
||||
SET citus.shard_max_size TO '256kB';
|
||||
|
||||
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
||||
|
|
|
@ -225,3 +225,82 @@ SELECT count(*) FROM customer_copy_range;
|
|||
1000
|
||||
(1 row)
|
||||
|
||||
-- Create a new append-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_append (
|
||||
c_custkey integer,
|
||||
c_name varchar(25) not null,
|
||||
c_address varchar(40),
|
||||
c_nationkey integer,
|
||||
c_phone char(15),
|
||||
c_acctbal decimal(15,2),
|
||||
c_mktsegment char(10),
|
||||
c_comment varchar(117));
|
||||
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test syntax error
|
||||
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
||||
ERROR: invalid input syntax for integer: "notinteger"
|
||||
CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
|
||||
-- Test that no shard is created for failing copy
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Test empty copy
|
||||
COPY customer_copy_append FROM STDIN;
|
||||
-- Test that no shard is created for copying zero rows
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Test proper copy
|
||||
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
||||
-- Check whether data was copied properly
|
||||
SELECT * FROM customer_copy_append;
|
||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
||||
-----------+-----------+-----------+-------------+---------+-----------+--------------+-----------
|
||||
1 | customer1 | | | | | |
|
||||
2 | customer2 | | | | | |
|
||||
(2 rows)
|
||||
|
||||
-- Create lineitem table
|
||||
CREATE TABLE lineitem_copy_append (
|
||||
l_orderkey bigint not null,
|
||||
l_partkey integer not null,
|
||||
l_suppkey integer not null,
|
||||
l_linenumber integer not null,
|
||||
l_quantity decimal(15, 2) not null,
|
||||
l_extendedprice decimal(15, 2) not null,
|
||||
l_discount decimal(15, 2) not null,
|
||||
l_tax decimal(15, 2) not null,
|
||||
l_returnflag char(1) not null,
|
||||
l_linestatus char(1) not null,
|
||||
l_shipdate date not null,
|
||||
l_commitdate date not null,
|
||||
l_receiptdate date not null,
|
||||
l_shipinstruct char(25) not null,
|
||||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null);
|
||||
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test multiple shard creation
|
||||
SET citus.shard_max_size TO '256kB';
|
||||
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
||||
count
|
||||
-------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
|
|
Loading…
Reference in New Issue