diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 5f90cf783..53b73c762 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -7,22 +7,28 @@ * The CitusCopyFrom function should be called from the utility hook to * process COPY ... FROM commands on distributed tables. CitusCopyFrom * parses the input from stdin, a program executed on the master, or a file - * on the master, and decides into which shard to put the data. It opens a - * new connection for every shard placement and uses the PQputCopyData - * function to copy the data. Because PQputCopyData transmits data, - * asynchronously, the workers will ingest data at least partially in - * parallel. + * on the master, and decides to copy new rows to existing shards or new shards + * based on the partition method of the distributed table. + * + * It opens a new connection for every shard placement and uses the PQputCopyData + * function to copy the data. Because PQputCopyData transmits data, asynchronously, + * the workers will ingest data at least partially in parallel. * * When failing to connect to a worker, the master marks the placement for * which it was trying to open a connection as inactive, similar to the way * DML statements are handled. If a failure occurs after connecting, the - * transaction is rolled back on all the workers. + * transaction is rolled back on all the workers. Note that, if the underlying + * table is append-partitioned, metadata changes are rolled back on the master + * node, but shard placements are left on the workers. * - * By default, COPY uses normal transactions on the workers. This can cause - * a problem when some of the transactions fail to commit while others have - * succeeded. To ensure no data is lost, COPY can use two-phase commit, by - * increasing max_prepared_transactions on the worker and setting - * citus.copy_transaction_manager to '2pc'. The default is '1pc'. + * By default, COPY uses normal transactions on the workers. In the case of + * hash or range-partitioned tables, this can cause a problem when some of the + * transactions fail to commit while others have succeeded. To ensure no data + * is lost, COPY can use two-phase commit, by increasing max_prepared_transactions + * on the worker and setting citus.copy_transaction_manager to '2pc'. The default + * is '1pc'. This is not a problem for append-partitioned tables because new + * shards are created and in the case of failure, metadata changes are rolled + * back on the master node. * * Parsing options are processed and enforced on the master, while * constraints are enforced on the worker. In either case, failure causes @@ -64,6 +70,7 @@ #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_transaction.h" @@ -132,6 +139,8 @@ typedef struct ShardConnections /* Local functions forward declarations */ +static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); +static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag); static void LockAllShards(List *shardIntervalList); static HTAB * CreateShardConnectionHash(void); static int CompareShardIntervalsById(const void *leftElement, const void *rightElement); @@ -153,6 +162,8 @@ static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, bool *shardConnectionsFound); static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections); +static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); +static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, @@ -161,6 +172,9 @@ static List * ConnectionList(HTAB *connectionHash); static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); +static void StartCopyToNewShard(ShardConnections *shardConnections, + Oid relationId, CopyStmt *copyStatement); +static void FinalizeCopyToNewShard(ShardConnections *shardConnections); /* Private functions copied and adapted from copy.c in PostgreSQL */ static void CopySendData(CopyOutState outputState, const void *databuf, int datasize); @@ -173,17 +187,65 @@ static inline void CopyFlushOutput(CopyOutState outputState, char *start, char * /* - * CitusCopyFrom implements the COPY table_name FROM ... for hash-partitioned - * and range-partitioned tables. + * CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy + * statement to related subfunctions based on the partition method of the + * distributed table. */ void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) +{ + Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + char partitionMethod = '\0'; + + /* disallow COPY to/from file or program except for superusers */ + if (copyStatement->filename != NULL && !superuser()) + { + if (copyStatement->is_program) + { + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from an external program"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from a file"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + } + } + + partitionMethod = PartitionMethod(tableId); + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) + { + CopyToExistingShards(copyStatement, completionTag); + } + else if (partitionMethod == DISTRIBUTE_BY_APPEND) + { + CopyToNewShards(copyStatement, completionTag); + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported partition method"))); + } +} + + +/* + * CopyToExistingShards implements the COPY table_name FROM ... for hash or + * range-partitioned tables where there are already shards into which to copy + * rows. + */ +static void +CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) { Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); char *relationName = get_rel_name(tableId); Relation distributedRelation = NULL; - char partitionMethod = '\0'; - Var *partitionColumn = NULL; TupleDesc tupleDescriptor = NULL; uint32 columnCount = 0; Datum *columnValues = NULL; @@ -210,35 +272,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) FmgrInfo *columnOutputFunctions = NULL; uint64 processedRowCount = 0; - /* disallow COPY to/from file or program except for superusers */ - if (copyStatement->filename != NULL && !superuser()) - { - if (copyStatement->is_program) - { - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to COPY to or from an external program"), - errhint("Anyone can COPY to stdout or from stdin. " - "psql's \\copy command also works for anyone."))); - } - else - { - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to COPY to or from a file"), - errhint("Anyone can COPY to stdout or from stdin. " - "psql's \\copy command also works for anyone."))); - } - } - - partitionColumn = PartitionColumn(tableId, 0); - partitionMethod = PartitionMethod(tableId); - if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY is only supported for hash- and " - "range-partitioned tables"))); - } + Var *partitionColumn = PartitionColumn(tableId, 0); + char partitionMethod = PartitionMethod(tableId); /* resolve hash function for partition column */ typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO); @@ -310,7 +345,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) /* * Create a mapping of shard id to a connection for each of its placements. - * The hash should be initialized before the PG_TRY, since it is used and + * The hash should be initialized before the PG_TRY, since it is used in * PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp * documentation). */ @@ -391,11 +426,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) /* open connections and initiate COPY on shard placements */ OpenCopyTransactions(copyStatement, shardConnections); - /* send binary headers to shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyBinaryHeaders(copyOutState); - SendCopyDataToAll(copyOutState->fe_msgbuf, - shardConnections->connectionList); + /* send copy binary headers to shard placements */ + SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); } /* replicate row to shard placements */ @@ -409,10 +441,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) connectionList = ConnectionList(shardConnectionHash); - /* send binary footers to all shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyBinaryFooters(copyOutState); - SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); + /* send copy binary footers to all shard placements */ + SendCopyBinaryFooters(copyOutState, connectionList); /* all lines have been copied, stop showing line number in errors */ error_context_stack = errorCallback.previous; @@ -450,7 +480,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) * we do not want any of the transactions rolled back if a failure occurs. Instead, * they should be rolled forward. */ - CommitRemoteTransactions(connectionList); + CommitRemoteTransactions(connectionList, false); CloseConnections(connectionList); if (completionTag != NULL) @@ -461,6 +491,171 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) } +/* + * CopyToNewShards implements the COPY table_name FROM ... for append-partitioned + * tables where we create new shards into which to copy rows. + */ +static void +CopyToNewShards(CopyStmt *copyStatement, char *completionTag) +{ + Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + FmgrInfo *columnOutputFunctions = NULL; + + /* allocate column values and nulls arrays */ + Relation distributedRelation = heap_open(relationId, RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation); + uint32 columnCount = tupleDescriptor->natts; + Datum *columnValues = palloc0(columnCount * sizeof(Datum)); + bool *columnNulls = palloc0(columnCount * sizeof(bool)); + + EState *executorState = CreateExecutorState(); + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); + + /* + * Shard connections should be initialized before the PG_TRY, since it is + * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH + * (see sigsetjmp documentation). + */ + ShardConnections *shardConnections = + (ShardConnections *) palloc0(sizeof(ShardConnections)); + + /* initialize copy state to read from COPY data source */ + CopyState copyState = BeginCopyFrom(distributedRelation, + copyStatement->filename, + copyStatement->is_program, + copyStatement->attlist, + copyStatement->options); + + CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->binary = true; + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = executorTupleContext; + + columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); + + /* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */ + PG_TRY(); + { + uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; + uint64 copiedDataSizeInBytes = 0; + uint64 processedRowCount = 0; + + /* set up callback to identify error line number */ + ErrorContextCallback errorCallback; + + errorCallback.callback = CopyFromErrorCallback; + errorCallback.arg = (void *) copyState; + errorCallback.previous = error_context_stack; + + while (true) + { + bool nextRowFound = false; + MemoryContext oldContext = NULL; + uint64 messageBufferSize = 0; + + ResetPerTupleExprContext(executorState); + + /* switch to tuple memory context and start showing line number in errors */ + error_context_stack = &errorCallback; + oldContext = MemoryContextSwitchTo(executorTupleContext); + + /* parse a row from the input */ + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + + if (!nextRowFound) + { + MemoryContextSwitchTo(oldContext); + break; + } + + CHECK_FOR_INTERRUPTS(); + + /* switch to regular memory context and stop showing line number in errors */ + MemoryContextSwitchTo(oldContext); + error_context_stack = errorCallback.previous; + + /* + * If copied data size is zero, this means either this is the first + * line in the copy or we just filled the previous shard up to its + * capacity. Either way, we need to create a new shard and + * start copying new rows into it. + */ + if (copiedDataSizeInBytes == 0) + { + /* create shard and open connections to shard placements */ + StartCopyToNewShard(shardConnections, relationId, copyStatement); + + /* send copy binary headers to shard placements */ + SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); + } + + /* replicate row to shard placements */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions); + SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); + + messageBufferSize = copyOutState->fe_msgbuf->len; + copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize; + + /* + * If we filled up this shard to its capacity, send copy binary footers + * to shard placements, commit copy transactions, close connections + * and finally update shard statistics. + * + * */ + if (copiedDataSizeInBytes > shardMaxSizeInBytes) + { + SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); + FinalizeCopyToNewShard(shardConnections); + UpdateShardStatistics(relationId, shardConnections->shardId); + + copiedDataSizeInBytes = 0; + } + + processedRowCount += 1; + } + + /* + * For the last shard, send copy binary footers to shard placements, + * commit copy transactions, close connections and finally update shard + * statistics. If no row is send, there is no shard to finalize the + * copy command. + */ + if (copiedDataSizeInBytes > 0) + { + SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); + FinalizeCopyToNewShard(shardConnections); + UpdateShardStatistics(relationId, shardConnections->shardId); + } + + EndCopyFrom(copyState); + heap_close(distributedRelation, NoLock); + + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); + + if (completionTag != NULL) + { + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processedRowCount); + } + } + PG_CATCH(); + { + /* roll back all transactions */ + EndRemoteCopy(shardConnections->connectionList, false); + AbortRemoteTransactions(shardConnections->connectionList); + CloseConnections(shardConnections->connectionList); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + + /* * LockAllShards takes shared locks on the metadata and the data of all shards in * shardIntervalList. This prevents concurrent placement changes and concurrent @@ -826,6 +1021,26 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } +/* Send copy binary headers to given connections */ +static void +SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList) +{ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryHeaders(copyOutState); + SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); +} + + +/* Send copy binary footers to given connections */ +static void +SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) +{ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryFooters(copyOutState); + SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); +} + + /* * ConstructCopyStatement constructs the text of a COPY statement for a particular * shard. @@ -1060,13 +1275,12 @@ void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions) { - MemoryContext oldContext = NULL; uint32 totalColumnCount = (uint32) rowDescriptor->natts; - uint32 columnIndex = 0; uint32 availableColumnCount = AvailableColumnCount(rowDescriptor); uint32 appendedColumnCount = 0; + uint32 columnIndex = 0; - oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); + MemoryContext oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); if (rowOutputState->binary) { @@ -1170,6 +1384,7 @@ void AppendCopyBinaryHeaders(CopyOutState headerOutputState) { const int32 zero = 0; + MemoryContext oldContext = MemoryContextSwitchTo(headerOutputState->rowcontext); /* Signature */ CopySendData(headerOutputState, BinarySignature, 11); @@ -1179,6 +1394,8 @@ AppendCopyBinaryHeaders(CopyOutState headerOutputState) /* No header extension */ CopySendInt32(headerOutputState, zero); + + MemoryContextSwitchTo(oldContext); } @@ -1190,8 +1407,52 @@ void AppendCopyBinaryFooters(CopyOutState footerOutputState) { int16 negative = -1; + MemoryContext oldContext = MemoryContextSwitchTo(footerOutputState->rowcontext); CopySendInt16(footerOutputState, negative); + + MemoryContextSwitchTo(oldContext); +} + + +/* + * StartCopyToNewShard creates a new shard and related shard placements and + * opens connections to shard placements. + */ +static void +StartCopyToNewShard(ShardConnections *shardConnections, Oid relationId, + CopyStmt *copyStatement) +{ + char *relationName = get_rel_name(relationId); + text *relationNameText = cstring_to_text(relationName); + Datum relationNameDatum = PointerGetDatum(relationNameText); + Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard, + relationNameDatum); + + int64 shardId = DatumGetInt64(shardIdDatum); + shardConnections->shardId = shardId; + + list_free_deep(shardConnections->connectionList); + shardConnections->connectionList = NIL; + + /* connect to shards placements and start transactions */ + OpenCopyTransactions(copyStatement, shardConnections); +} + + +/* + * FinalizeCopyToNewShard commits copy transaction and closes connections to + * shard placements. + */ +static void +FinalizeCopyToNewShard(ShardConnections *shardConnections) +{ + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardConnections->connectionList, true); + + /* commit transactions and close connections */ + CommitRemoteTransactions(shardConnections->connectionList, true); + CloseConnections(shardConnections->connectionList); } diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 5b83802eb..8be57af52 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -211,7 +211,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS) * We mark shard placements that we couldn't drop as to be deleted later, but * we do delete the shard metadadata. */ -int +static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList) { diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 3a79c4a73..c9c7c7464 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -43,7 +43,7 @@ static bool WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId, List *ddlCommandList); static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, - char *shardName, uint64 *shardLength, + char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); static uint64 WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName); @@ -159,14 +159,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) List *succeededPlacementList = NIL; List *failedPlacementList = NIL; ListCell *shardPlacementCell = NULL; - ListCell *succeededPlacementCell = NULL; ListCell *failedPlacementCell = NULL; - bool statsOK = false; - uint64 newShardLength = 0; + uint64 newShardSize = 0; uint64 shardMaxSizeInBytes = 0; float4 shardFillLevel = 0.0; - text *newMinValue = NULL; - text *newMaxValue = NULL; char partitionMethod = 0; ShardInterval *shardInterval = LoadShardInterval(shardId); @@ -264,64 +260,12 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) RESUME_INTERRUPTS(); - /* get appended shard's statistics from a shard placement */ - foreach(succeededPlacementCell, succeededPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(succeededPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - - statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName, - &newShardLength, &newMinValue, &newMaxValue); - if (statsOK) - { - break; - } - } - - /* - * If for some reason we appended data to a shard, but failed to retrieve - * statistics we just WARN here to avoid losing shard-state updates. Note - * that this means we will return 0 as the shard fill-factor, and this shard - * also won't be pruned as the statistics will be empty. If the failure was - * transient, a subsequent append call will fetch the correct statistics. - */ - if (!statsOK) - { - ereport(WARNING, (errmsg("could not get statistics for shard placement"), - errdetail("Setting shard statistics to NULL"))); - } - - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); - - /* update metadata for each shard placement we appended to */ - succeededPlacementCell = NULL; - foreach(succeededPlacementCell, succeededPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(succeededPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - - DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_FINALIZED, newShardLength, - workerName, workerPort); - } - - DeleteShardRow(shardId); - InsertShardRow(relationId, shardId, storageType, newMinValue, newMaxValue); - - if (QueryCancelPending) - { - ereport(WARNING, (errmsg("cancel requests are ignored during table appends"))); - QueryCancelPending = false; - } - - RESUME_INTERRUPTS(); + /* update shard statistics and get new shard size */ + newShardSize = UpdateShardStatistics(relationId, shardId); /* calculate ratio of current shard size compared to shard max size */ shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; - shardFillLevel = ((float4) newShardLength / (float4) shardMaxSizeInBytes); + shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes); PG_RETURN_FLOAT4(shardFillLevel); } @@ -446,13 +390,99 @@ WorkerCreateShard(char *nodeName, uint32 nodePort, } +/* + * UpdateShardStatistics updates metadata for the given shard id and returns + * the new shard size. + */ +uint64 +UpdateShardStatistics(Oid relationId, int64 shardId) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + char storageType = shardInterval->storageType; + char *shardName = NULL; + List *shardPlacementList = NIL; + ListCell *shardPlacementCell = NULL; + bool statsOK = false; + uint64 shardSize = 0; + text *minValue = NULL; + text *maxValue = NULL; + + /* if shard doesn't have an alias, extend regular table name */ + shardName = LoadShardAlias(relationId, shardId); + if (shardName == NULL) + { + shardName = get_rel_name(relationId); + AppendShardIdToName(&shardName, shardId); + } + + shardPlacementList = FinalizedShardPlacementList(shardId); + + /* get shard's statistics from a shard placement */ + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + char *workerName = placement->nodeName; + uint32 workerPort = placement->nodePort; + + statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName, + &shardSize, &minValue, &maxValue); + if (statsOK) + { + break; + } + } + + /* + * If for some reason we appended data to a shard, but failed to retrieve + * statistics we just WARN here to avoid losing shard-state updates. Note + * that this means we will return 0 as the shard fill-factor, and this shard + * also won't be pruned as the statistics will be empty. If the failure was + * transient, a subsequent append call will fetch the correct statistics. + */ + if (!statsOK) + { + ereport(WARNING, (errmsg("could not get statistics for shard %s", shardName), + errdetail("Setting shard statistics to NULL"))); + } + + /* make sure we don't process cancel signals */ + HOLD_INTERRUPTS(); + + /* update metadata for each shard placement we appended to */ + shardPlacementCell = NULL; + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + char *workerName = placement->nodeName; + uint32 workerPort = placement->nodePort; + + DeleteShardPlacementRow(shardId, workerName, workerPort); + InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize, + workerName, workerPort); + } + + DeleteShardRow(shardId); + InsertShardRow(relationId, shardId, storageType, minValue, maxValue); + + if (QueryCancelPending) + { + ereport(WARNING, (errmsg("cancel requests are ignored during metadata update"))); + QueryCancelPending = false; + } + + RESUME_INTERRUPTS(); + + return shardSize; +} + + /* * WorkerShardStats queries the worker node, and retrieves shard statistics that * we assume have changed after new table data have been appended to the shard. */ static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, - uint64 *shardLength, text **shardMinValue, text **shardMaxValue) + uint64 *shardSize, text **shardMinValue, text **shardMaxValue) { bool shardStatsOK = true; @@ -464,7 +494,7 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam StringInfo maxValue = WorkerPartitionValue(nodeName, nodePort, relationId, shardName, SHARD_MAX_VALUE_QUERY); - (*shardLength) = tableSize; + (*shardSize) = tableSize; (*shardMinValue) = cstring_to_text_with_len(minValue->data, minValue->len); (*shardMaxValue) = cstring_to_text_with_len(maxValue->data, maxValue->len); } diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index 46b129a52..84cde1a73 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -135,10 +135,15 @@ AbortRemoteTransactions(List *connectionList) /* * CommitRemoteTransactions commits all transactions on connections in connectionList. - * On failure, it reports a warning and continues committing all of them. + * If stopOnFailure is true, then CommitRemoteTransactions reports an error on + * failure, otherwise it reports a warning. + * Note that if the caller of this function wants the transactions to roll back + * on a failing commit, stopOnFailure should be used as true. On the other hand, + * if the caller does not want the transactions to roll back on a failing commit, + * stopOnFailure should be used as false. */ void -CommitRemoteTransactions(List *connectionList) +CommitRemoteTransactions(List *connectionList, bool stopOnFailure) { ListCell *connectionCell = NULL; @@ -166,11 +171,24 @@ CommitRemoteTransactions(List *connectionList) char *nodeName = ConnectionGetOptionValue(connection, "host"); char *nodePort = ConnectionGetOptionValue(connection, "port"); - /* log a warning so the user may commit the transaction later */ - ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); + /* + * If stopOnFailure is false, log a warning so the user may + * commit the transaction later. + */ + if (stopOnFailure) + { + ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'", + transactionName->data), + errhint("Run \"%s\" on %s:%s", + command->data, nodeName, nodePort))); + } + else + { + ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'", + transactionName->data), + errhint("Run \"%s\" on %s:%s", + command->data, nodeName, nodePort))); + } } } else @@ -178,15 +196,26 @@ CommitRemoteTransactions(List *connectionList) /* we shouldn't be committing if any transactions are not open */ Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN); - /* try to commit, if it fails then the user might lose data */ + /* + * Try to commit, if it fails and stopOnFailure is false then + * the user might lose data. + */ result = PQexec(connection, "COMMIT"); if (PQresultStatus(result) != PGRES_COMMAND_OK) { char *nodeName = ConnectionGetOptionValue(connection, "host"); char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(WARNING, (errmsg("failed to commit transaction on %s:%s", - nodeName, nodePort))); + if (stopOnFailure) + { + ereport(ERROR, (errmsg("failed to commit transaction on %s:%s", + nodeName, nodePort))); + } + else + { + ereport(WARNING, (errmsg("failed to commit transaction on %s:%s", + nodeName, nodePort))); + } } } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index c3d543bca..e9f76a093 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -85,6 +85,7 @@ extern void CheckDistributedTable(Oid relationId); extern void CreateShardPlacements(int64 shardId, List *ddlEventList, List *workerNodeList, int workerStartIndex, int replicationFactor); +extern uint64 UpdateShardStatistics(Oid relationId, int64 shardId); /* Function declarations for generating metadata for shard creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/multi_transaction.h index 760899e44..a84e1f7fe 100644 --- a/src/include/distributed/multi_transaction.h +++ b/src/include/distributed/multi_transaction.h @@ -51,7 +51,7 @@ typedef struct TransactionConnection extern void InitializeDistributedTransaction(void); extern void PrepareRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList); -extern void CommitRemoteTransactions(List *connectionList); +extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); extern void CloseConnections(List *connectionList); diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 8a690fba8..a59bdf8b2 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -1,6 +1,7 @@ -- -- MULTI_COPY -- + -- Create a new hash-partitioned table into which to COPY CREATE TABLE customer_copy_hash ( c_custkey integer, @@ -188,3 +189,68 @@ FROM customer_copy_range WHERE c_custkey <= 500; -- Check whether data was copied SELECT count(*) FROM customer_copy_range; + +-- Create a new append-partitioned table into which to COPY +CREATE TABLE customer_copy_append ( + c_custkey integer, + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117)); +SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append'); + +-- Test syntax error +COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); +1,customer1 +2,customer2 +notinteger,customernot +\. + +-- Test that no shard is created for failing copy +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass; + +-- Test empty copy +COPY customer_copy_append FROM STDIN; +\. + +-- Test that no shard is created for copying zero rows +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass; + +-- Test proper copy +COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); +1,customer1 +2,customer2 +\. + +-- Check whether data was copied properly +SELECT * FROM customer_copy_append; + +-- Create lineitem table +CREATE TABLE lineitem_copy_append ( + l_orderkey bigint not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null); +SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append'); + +-- Test multiple shard creation +SET citus.shard_max_size TO '256kB'; + +COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'; + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index c43e04292..804dffb3a 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -225,3 +225,82 @@ SELECT count(*) FROM customer_copy_range; 1000 (1 row) +-- Create a new append-partitioned table into which to COPY +CREATE TABLE customer_copy_append ( + c_custkey integer, + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117)); +SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Test syntax error +COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); +ERROR: invalid input syntax for integer: "notinteger" +CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger" +-- Test that no shard is created for failing copy +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass; + count +------- + 0 +(1 row) + +-- Test empty copy +COPY customer_copy_append FROM STDIN; +-- Test that no shard is created for copying zero rows +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass; + count +------- + 0 +(1 row) + +-- Test proper copy +COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); +-- Check whether data was copied properly +SELECT * FROM customer_copy_append; + c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment +-----------+-----------+-----------+-------------+---------+-----------+--------------+----------- + 1 | customer1 | | | | | | + 2 | customer2 | | | | | | +(2 rows) + +-- Create lineitem table +CREATE TABLE lineitem_copy_append ( + l_orderkey bigint not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null); +SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Test multiple shard creation +SET citus.shard_max_size TO '256kB'; +COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass; + count +------- + 5 +(1 row) +