Merge pull request #445 from citusdata/feature/master-copy-for-append-partitioning

Add COPY support on master node for append partitioned relations

CR: @marcocitus
pull/459/head
Metin Döşlü 2016-04-19 22:08:02 +03:00
commit 339e1364d5
8 changed files with 599 additions and 133 deletions

View File

@ -7,22 +7,28 @@
* The CitusCopyFrom function should be called from the utility hook to
* process COPY ... FROM commands on distributed tables. CitusCopyFrom
* parses the input from stdin, a program executed on the master, or a file
* on the master, and decides into which shard to put the data. It opens a
* new connection for every shard placement and uses the PQputCopyData
* function to copy the data. Because PQputCopyData transmits data,
* asynchronously, the workers will ingest data at least partially in
* parallel.
* on the master, and decides to copy new rows to existing shards or new shards
* based on the partition method of the distributed table.
*
* It opens a new connection for every shard placement and uses the PQputCopyData
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
* the workers will ingest data at least partially in parallel.
*
* When failing to connect to a worker, the master marks the placement for
* which it was trying to open a connection as inactive, similar to the way
* DML statements are handled. If a failure occurs after connecting, the
* transaction is rolled back on all the workers.
* transaction is rolled back on all the workers. Note that, if the underlying
* table is append-partitioned, metadata changes are rolled back on the master
* node, but shard placements are left on the workers.
*
* By default, COPY uses normal transactions on the workers. This can cause
* a problem when some of the transactions fail to commit while others have
* succeeded. To ensure no data is lost, COPY can use two-phase commit, by
* increasing max_prepared_transactions on the worker and setting
* citus.copy_transaction_manager to '2pc'. The default is '1pc'.
* By default, COPY uses normal transactions on the workers. In the case of
* hash or range-partitioned tables, this can cause a problem when some of the
* transactions fail to commit while others have succeeded. To ensure no data
* is lost, COPY can use two-phase commit, by increasing max_prepared_transactions
* on the worker and setting citus.copy_transaction_manager to '2pc'. The default
* is '1pc'. This is not a problem for append-partitioned tables because new
* shards are created and in the case of failure, metadata changes are rolled
* back on the master node.
*
* Parsing options are processed and enforced on the master, while
* constraints are enforced on the worker. In either case, failure causes
@ -64,6 +70,7 @@
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_transaction.h"
@ -132,6 +139,8 @@ typedef struct ShardConnections
/* Local functions forward declarations */
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag);
static void LockAllShards(List *shardIntervalList);
static HTAB * CreateShardConnectionHash(void);
static int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
@ -153,6 +162,8 @@ static ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
bool *shardConnectionsFound);
static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
@ -161,6 +172,9 @@ static List * ConnectionList(HTAB *connectionHash);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections,
Oid relationId, CopyStmt *copyStatement);
static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
@ -173,17 +187,65 @@ static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *
/*
* CitusCopyFrom implements the COPY table_name FROM ... for hash-partitioned
* and range-partitioned tables.
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
* statement to related subfunctions based on the partition method of the
* distributed table.
*/
void
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = '\0';
/* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser())
{
if (copyStatement->is_program)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from an external program"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
}
}
partitionMethod = PartitionMethod(tableId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
{
CopyToExistingShards(copyStatement, completionTag);
}
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
CopyToNewShards(copyStatement, completionTag);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
}
}
/*
* CopyToExistingShards implements the COPY table_name FROM ... for hash or
* range-partitioned tables where there are already shards into which to copy
* rows.
*/
static void
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
{
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char *relationName = get_rel_name(tableId);
Relation distributedRelation = NULL;
char partitionMethod = '\0';
Var *partitionColumn = NULL;
TupleDesc tupleDescriptor = NULL;
uint32 columnCount = 0;
Datum *columnValues = NULL;
@ -210,35 +272,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
FmgrInfo *columnOutputFunctions = NULL;
uint64 processedRowCount = 0;
/* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser())
{
if (copyStatement->is_program)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from an external program"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
}
}
partitionColumn = PartitionColumn(tableId, 0);
partitionMethod = PartitionMethod(tableId);
if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY is only supported for hash- and "
"range-partitioned tables")));
}
Var *partitionColumn = PartitionColumn(tableId, 0);
char partitionMethod = PartitionMethod(tableId);
/* resolve hash function for partition column */
typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO);
@ -310,7 +345,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
/*
* Create a mapping of shard id to a connection for each of its placements.
* The hash should be initialized before the PG_TRY, since it is used and
* The hash should be initialized before the PG_TRY, since it is used in
* PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
* documentation).
*/
@ -391,11 +426,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
/* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections);
/* send binary headers to shard placements */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf,
shardConnections->connectionList);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
}
/* replicate row to shard placements */
@ -409,10 +441,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
connectionList = ConnectionList(shardConnectionHash);
/* send binary footers to all shard placements */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
/* send copy binary footers to all shard placements */
SendCopyBinaryFooters(copyOutState, connectionList);
/* all lines have been copied, stop showing line number in errors */
error_context_stack = errorCallback.previous;
@ -450,7 +480,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
* we do not want any of the transactions rolled back if a failure occurs. Instead,
* they should be rolled forward.
*/
CommitRemoteTransactions(connectionList);
CommitRemoteTransactions(connectionList, false);
CloseConnections(connectionList);
if (completionTag != NULL)
@ -461,6 +491,171 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
}
/*
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
* tables where we create new shards into which to copy rows.
*/
static void
CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
{
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
FmgrInfo *columnOutputFunctions = NULL;
/* allocate column values and nulls arrays */
Relation distributedRelation = heap_open(relationId, RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
uint32 columnCount = tupleDescriptor->natts;
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
bool *columnNulls = palloc0(columnCount * sizeof(bool));
EState *executorState = CreateExecutorState();
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
/*
* Shard connections should be initialized before the PG_TRY, since it is
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
* (see sigsetjmp documentation).
*/
ShardConnections *shardConnections =
(ShardConnections *) palloc0(sizeof(ShardConnections));
/* initialize copy state to read from COPY data source */
CopyState copyState = BeginCopyFrom(distributedRelation,
copyStatement->filename,
copyStatement->is_program,
copyStatement->attlist,
copyStatement->options);
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->binary = true;
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
PG_TRY();
{
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
uint64 copiedDataSizeInBytes = 0;
uint64 processedRowCount = 0;
/* set up callback to identify error line number */
ErrorContextCallback errorCallback;
errorCallback.callback = CopyFromErrorCallback;
errorCallback.arg = (void *) copyState;
errorCallback.previous = error_context_stack;
while (true)
{
bool nextRowFound = false;
MemoryContext oldContext = NULL;
uint64 messageBufferSize = 0;
ResetPerTupleExprContext(executorState);
/* switch to tuple memory context and start showing line number in errors */
error_context_stack = &errorCallback;
oldContext = MemoryContextSwitchTo(executorTupleContext);
/* parse a row from the input */
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
columnValues, columnNulls, NULL);
if (!nextRowFound)
{
MemoryContextSwitchTo(oldContext);
break;
}
CHECK_FOR_INTERRUPTS();
/* switch to regular memory context and stop showing line number in errors */
MemoryContextSwitchTo(oldContext);
error_context_stack = errorCallback.previous;
/*
* If copied data size is zero, this means either this is the first
* line in the copy or we just filled the previous shard up to its
* capacity. Either way, we need to create a new shard and
* start copying new rows into it.
*/
if (copiedDataSizeInBytes == 0)
{
/* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, relationId, copyStatement);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
}
/* replicate row to shard placements */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList);
messageBufferSize = copyOutState->fe_msgbuf->len;
copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize;
/*
* If we filled up this shard to its capacity, send copy binary footers
* to shard placements, commit copy transactions, close connections
* and finally update shard statistics.
*
* */
if (copiedDataSizeInBytes > shardMaxSizeInBytes)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
FinalizeCopyToNewShard(shardConnections);
UpdateShardStatistics(relationId, shardConnections->shardId);
copiedDataSizeInBytes = 0;
}
processedRowCount += 1;
}
/*
* For the last shard, send copy binary footers to shard placements,
* commit copy transactions, close connections and finally update shard
* statistics. If no row is send, there is no shard to finalize the
* copy command.
*/
if (copiedDataSizeInBytes > 0)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
FinalizeCopyToNewShard(shardConnections);
UpdateShardStatistics(relationId, shardConnections->shardId);
}
EndCopyFrom(copyState);
heap_close(distributedRelation, NoLock);
/* check for cancellation one last time before returning */
CHECK_FOR_INTERRUPTS();
if (completionTag != NULL)
{
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount);
}
}
PG_CATCH();
{
/* roll back all transactions */
EndRemoteCopy(shardConnections->connectionList, false);
AbortRemoteTransactions(shardConnections->connectionList);
CloseConnections(shardConnections->connectionList);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* LockAllShards takes shared locks on the metadata and the data of all shards in
* shardIntervalList. This prevents concurrent placement changes and concurrent
@ -826,6 +1021,26 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
}
/* Send copy binary headers to given connections */
static void
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList)
{
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
}
/* Send copy binary footers to given connections */
static void
SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
{
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
}
/*
* ConstructCopyStatement constructs the text of a COPY statement for a particular
* shard.
@ -1060,13 +1275,12 @@ void
AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions)
{
MemoryContext oldContext = NULL;
uint32 totalColumnCount = (uint32) rowDescriptor->natts;
uint32 columnIndex = 0;
uint32 availableColumnCount = AvailableColumnCount(rowDescriptor);
uint32 appendedColumnCount = 0;
uint32 columnIndex = 0;
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
MemoryContext oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
if (rowOutputState->binary)
{
@ -1170,6 +1384,7 @@ void
AppendCopyBinaryHeaders(CopyOutState headerOutputState)
{
const int32 zero = 0;
MemoryContext oldContext = MemoryContextSwitchTo(headerOutputState->rowcontext);
/* Signature */
CopySendData(headerOutputState, BinarySignature, 11);
@ -1179,6 +1394,8 @@ AppendCopyBinaryHeaders(CopyOutState headerOutputState)
/* No header extension */
CopySendInt32(headerOutputState, zero);
MemoryContextSwitchTo(oldContext);
}
@ -1190,8 +1407,52 @@ void
AppendCopyBinaryFooters(CopyOutState footerOutputState)
{
int16 negative = -1;
MemoryContext oldContext = MemoryContextSwitchTo(footerOutputState->rowcontext);
CopySendInt16(footerOutputState, negative);
MemoryContextSwitchTo(oldContext);
}
/*
* StartCopyToNewShard creates a new shard and related shard placements and
* opens connections to shard placements.
*/
static void
StartCopyToNewShard(ShardConnections *shardConnections, Oid relationId,
CopyStmt *copyStatement)
{
char *relationName = get_rel_name(relationId);
text *relationNameText = cstring_to_text(relationName);
Datum relationNameDatum = PointerGetDatum(relationNameText);
Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard,
relationNameDatum);
int64 shardId = DatumGetInt64(shardIdDatum);
shardConnections->shardId = shardId;
list_free_deep(shardConnections->connectionList);
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections);
}
/*
* FinalizeCopyToNewShard commits copy transaction and closes connections to
* shard placements.
*/
static void
FinalizeCopyToNewShard(ShardConnections *shardConnections)
{
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->connectionList, true);
/* commit transactions and close connections */
CommitRemoteTransactions(shardConnections->connectionList, true);
CloseConnections(shardConnections->connectionList);
}

View File

@ -211,7 +211,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
* We mark shard placements that we couldn't drop as to be deleted later, but
* we do delete the shard metadadata.
*/
int
static int
DropShards(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList)
{

View File

@ -43,7 +43,7 @@
static bool WorkerCreateShard(char *nodeName, uint32 nodePort,
uint64 shardId, List *ddlCommandList);
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardLength,
char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue);
static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId,
char *tableName);
@ -159,14 +159,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
List *succeededPlacementList = NIL;
List *failedPlacementList = NIL;
ListCell *shardPlacementCell = NULL;
ListCell *succeededPlacementCell = NULL;
ListCell *failedPlacementCell = NULL;
bool statsOK = false;
uint64 newShardLength = 0;
uint64 newShardSize = 0;
uint64 shardMaxSizeInBytes = 0;
float4 shardFillLevel = 0.0;
text *newMinValue = NULL;
text *newMaxValue = NULL;
char partitionMethod = 0;
ShardInterval *shardInterval = LoadShardInterval(shardId);
@ -264,64 +260,12 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
RESUME_INTERRUPTS();
/* get appended shard's statistics from a shard placement */
foreach(succeededPlacementCell, succeededPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(succeededPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName,
&newShardLength, &newMinValue, &newMaxValue);
if (statsOK)
{
break;
}
}
/*
* If for some reason we appended data to a shard, but failed to retrieve
* statistics we just WARN here to avoid losing shard-state updates. Note
* that this means we will return 0 as the shard fill-factor, and this shard
* also won't be pruned as the statistics will be empty. If the failure was
* transient, a subsequent append call will fetch the correct statistics.
*/
if (!statsOK)
{
ereport(WARNING, (errmsg("could not get statistics for shard placement"),
errdetail("Setting shard statistics to NULL")));
}
/* make sure we don't process cancel signals */
HOLD_INTERRUPTS();
/* update metadata for each shard placement we appended to */
succeededPlacementCell = NULL;
foreach(succeededPlacementCell, succeededPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(succeededPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_FINALIZED, newShardLength,
workerName, workerPort);
}
DeleteShardRow(shardId);
InsertShardRow(relationId, shardId, storageType, newMinValue, newMaxValue);
if (QueryCancelPending)
{
ereport(WARNING, (errmsg("cancel requests are ignored during table appends")));
QueryCancelPending = false;
}
RESUME_INTERRUPTS();
/* update shard statistics and get new shard size */
newShardSize = UpdateShardStatistics(relationId, shardId);
/* calculate ratio of current shard size compared to shard max size */
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
shardFillLevel = ((float4) newShardLength / (float4) shardMaxSizeInBytes);
shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes);
PG_RETURN_FLOAT4(shardFillLevel);
}
@ -446,13 +390,99 @@ WorkerCreateShard(char *nodeName, uint32 nodePort,
}
/*
* UpdateShardStatistics updates metadata for the given shard id and returns
* the new shard size.
*/
uint64
UpdateShardStatistics(Oid relationId, int64 shardId)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
char storageType = shardInterval->storageType;
char *shardName = NULL;
List *shardPlacementList = NIL;
ListCell *shardPlacementCell = NULL;
bool statsOK = false;
uint64 shardSize = 0;
text *minValue = NULL;
text *maxValue = NULL;
/* if shard doesn't have an alias, extend regular table name */
shardName = LoadShardAlias(relationId, shardId);
if (shardName == NULL)
{
shardName = get_rel_name(relationId);
AppendShardIdToName(&shardName, shardId);
}
shardPlacementList = FinalizedShardPlacementList(shardId);
/* get shard's statistics from a shard placement */
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName,
&shardSize, &minValue, &maxValue);
if (statsOK)
{
break;
}
}
/*
* If for some reason we appended data to a shard, but failed to retrieve
* statistics we just WARN here to avoid losing shard-state updates. Note
* that this means we will return 0 as the shard fill-factor, and this shard
* also won't be pruned as the statistics will be empty. If the failure was
* transient, a subsequent append call will fetch the correct statistics.
*/
if (!statsOK)
{
ereport(WARNING, (errmsg("could not get statistics for shard %s", shardName),
errdetail("Setting shard statistics to NULL")));
}
/* make sure we don't process cancel signals */
HOLD_INTERRUPTS();
/* update metadata for each shard placement we appended to */
shardPlacementCell = NULL;
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize,
workerName, workerPort);
}
DeleteShardRow(shardId);
InsertShardRow(relationId, shardId, storageType, minValue, maxValue);
if (QueryCancelPending)
{
ereport(WARNING, (errmsg("cancel requests are ignored during metadata update")));
QueryCancelPending = false;
}
RESUME_INTERRUPTS();
return shardSize;
}
/*
* WorkerShardStats queries the worker node, and retrieves shard statistics that
* we assume have changed after new table data have been appended to the shard.
*/
static bool
WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName,
uint64 *shardLength, text **shardMinValue, text **shardMaxValue)
uint64 *shardSize, text **shardMinValue, text **shardMaxValue)
{
bool shardStatsOK = true;
@ -464,7 +494,7 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId,
shardName, SHARD_MAX_VALUE_QUERY);
(*shardLength) = tableSize;
(*shardSize) = tableSize;
(*shardMinValue) = cstring_to_text_with_len(minValue->data, minValue->len);
(*shardMaxValue) = cstring_to_text_with_len(maxValue->data, maxValue->len);
}

View File

@ -135,10 +135,15 @@ AbortRemoteTransactions(List *connectionList)
/*
* CommitRemoteTransactions commits all transactions on connections in connectionList.
* On failure, it reports a warning and continues committing all of them.
* If stopOnFailure is true, then CommitRemoteTransactions reports an error on
* failure, otherwise it reports a warning.
* Note that if the caller of this function wants the transactions to roll back
* on a failing commit, stopOnFailure should be used as true. On the other hand,
* if the caller does not want the transactions to roll back on a failing commit,
* stopOnFailure should be used as false.
*/
void
CommitRemoteTransactions(List *connectionList)
CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
{
ListCell *connectionCell = NULL;
@ -166,11 +171,24 @@ CommitRemoteTransactions(List *connectionList)
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
/* log a warning so the user may commit the transaction later */
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
/*
* If stopOnFailure is false, log a warning so the user may
* commit the transaction later.
*/
if (stopOnFailure)
{
ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
else
{
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
}
}
else
@ -178,15 +196,26 @@ CommitRemoteTransactions(List *connectionList)
/* we shouldn't be committing if any transactions are not open */
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
/* try to commit, if it fails then the user might lose data */
/*
* Try to commit, if it fails and stopOnFailure is false then
* the user might lose data.
*/
result = PQexec(connection, "COMMIT");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
nodeName, nodePort)));
if (stopOnFailure)
{
ereport(ERROR, (errmsg("failed to commit transaction on %s:%s",
nodeName, nodePort)));
}
else
{
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
nodeName, nodePort)));
}
}
}

View File

@ -85,6 +85,7 @@ extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(int64 shardId, List *ddlEventList,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(Oid relationId, int64 shardId);
/* Function declarations for generating metadata for shard creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);

View File

@ -51,7 +51,7 @@ typedef struct TransactionConnection
extern void InitializeDistributedTransaction(void);
extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
extern void CloseConnections(List *connectionList);

View File

@ -1,6 +1,7 @@
--
-- MULTI_COPY
--
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
@ -188,3 +189,68 @@ FROM customer_copy_range WHERE c_custkey <= 500;
-- Check whether data was copied
SELECT count(*) FROM customer_copy_range;
-- Create a new append-partitioned table into which to COPY
CREATE TABLE customer_copy_append (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117));
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
-- Test syntax error
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
1,customer1
2,customer2
notinteger,customernot
\.
-- Test that no shard is created for failing copy
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
-- Test empty copy
COPY customer_copy_append FROM STDIN;
\.
-- Test that no shard is created for copying zero rows
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
-- Test proper copy
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
1,customer1
2,customer2
\.
-- Check whether data was copied properly
SELECT * FROM customer_copy_append;
-- Create lineitem table
CREATE TABLE lineitem_copy_append (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null);
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
-- Test multiple shard creation
SET citus.shard_max_size TO '256kB';
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;

View File

@ -225,3 +225,82 @@ SELECT count(*) FROM customer_copy_range;
1000
(1 row)
-- Create a new append-partitioned table into which to COPY
CREATE TABLE customer_copy_append (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117));
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Test syntax error
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
ERROR: invalid input syntax for integer: "notinteger"
CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
-- Test that no shard is created for failing copy
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
count
-------
0
(1 row)
-- Test empty copy
COPY customer_copy_append FROM STDIN;
-- Test that no shard is created for copying zero rows
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
count
-------
0
(1 row)
-- Test proper copy
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
-- Check whether data was copied properly
SELECT * FROM customer_copy_append;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
-----------+-----------+-----------+-------------+---------+-----------+--------------+-----------
1 | customer1 | | | | | |
2 | customer2 | | | | | |
(2 rows)
-- Create lineitem table
CREATE TABLE lineitem_copy_append (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null);
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Test multiple shard creation
SET citus.shard_max_size TO '256kB';
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
count
-------
5
(1 row)