From d74fb764b13b23b6d753e343f535bedf16942d1c Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 25 Jan 2017 14:14:17 +0100 Subject: [PATCH] Use CitusCopyDestReceiver for regular COPY --- src/backend/distributed/commands/multi_copy.c | 244 ++++-------------- src/test/regress/input/multi_copy.source | 4 +- src/test/regress/output/multi_copy.source | 16 +- 3 files changed, 58 insertions(+), 206 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index dfdd7c1da..f98dd83a7 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -287,49 +287,31 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) { Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char *relationName = get_rel_name(tableId); + + CitusCopyDestReceiver *copyDest = NULL; + DestReceiver *dest = NULL; + Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; uint32 columnCount = 0; Datum *columnValues = NULL; bool *columnNulls = NULL; - FmgrInfo *hashFunction = NULL; - FmgrInfo *compareFunction = NULL; - bool hasUniformHashDistribution = false; - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; - - int shardCount = 0; - List *shardIntervalList = NULL; - ShardInterval **shardIntervalCache = NULL; - bool useBinarySearch = false; - - HTAB *shardConnectionHash = NULL; - ShardConnections *shardConnections = NULL; - List *shardConnectionsList = NIL; - ListCell *shardConnectionsCell = NULL; + int columnIndex = 0; + List *columnNameList = NIL; + TupleTableSlot *tupleTableSlot = NULL; EState *executorState = NULL; MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; + char partitionMethod = 0; + bool stopOnFailure = false; + CopyState copyState = NULL; - CopyOutState copyOutState = NULL; - FmgrInfo *columnOutputFunctions = NULL; uint64 processedRowCount = 0; - Var *partitionColumn = PartitionColumn(tableId, 0); - char partitionMethod = PartitionMethod(tableId); - ErrorContextCallback errorCallback; - /* get hash function for partition column */ - hashFunction = cacheEntry->hashFunction; - - /* get compare function for shard intervals */ - compareFunction = cacheEntry->shardIntervalCompareFunction; - /* allocate column values and nulls arrays */ distributedRelation = heap_open(tableId, RowExclusiveLock); tupleDescriptor = RelationGetDescr(distributedRelation); @@ -337,64 +319,40 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); - /* we don't support copy to reference tables from workers */ + /* set up a virtual tuple table slot */ + tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); + tupleTableSlot->tts_nvalid = columnCount; + tupleTableSlot->tts_values = columnValues; + tupleTableSlot->tts_isnull = columnNulls; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; + char *columnName = NameStr(currentColumn->attname); + + if (currentColumn->attisdropped) + { + continue; + } + + columnNameList = lappend(columnNameList, columnName); + } + + executorState = CreateExecutorState(); + executorTupleContext = GetPerTupleMemoryContext(executorState); + executorExpressionContext = GetPerTupleExprContext(executorState); + + partitionMethod = PartitionMethod(tableId); if (partitionMethod == DISTRIBUTE_BY_NONE) { - EnsureCoordinator(); + stopOnFailure = true; } - /* load the list of shards and verify that we have shards to copy into */ - shardIntervalList = LoadShardIntervalList(tableId); - if (shardIntervalList == NIL) - { - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName), - errhint("Run master_create_worker_shards to create shards " - "and try again."))); - } - else - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName))); - } - } - - /* error if any shard missing min/max values for non reference tables */ - if (partitionMethod != DISTRIBUTE_BY_NONE && - cacheEntry->hasUninitializedShardInterval) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not start copy"), - errdetail("Distributed relation \"%s\" has shards " - "with missing shardminvalue/shardmaxvalue.", - relationName))); - } - - /* prevent concurrent placement changes and non-commutative DML statements */ - LockShardListMetadata(shardIntervalList, ShareLock); - LockShardListResources(shardIntervalList, ShareLock); - - /* initialize the shard interval cache */ - shardCount = cacheEntry->shardIntervalArrayLength; - shardIntervalCache = cacheEntry->sortedShardIntervalArray; - hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; - - /* determine whether to use binary search */ - if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) - { - useBinarySearch = true; - } - - if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) - { - CoordinatedTransactionUse2PC(); - } + /* set up the destination for the COPY */ + copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState, + stopOnFailure); + dest = (DestReceiver *) copyDest; + dest->rStartup(dest, 0, tupleDescriptor); /* initialize copy state to read from COPY data source */ copyState = BeginCopyFrom(distributedRelation, @@ -403,30 +361,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) copyStatement->attlist, copyStatement->options); - executorState = CreateExecutorState(); - executorTupleContext = GetPerTupleMemoryContext(executorState); - executorExpressionContext = GetPerTupleExprContext(executorState); - - copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = executorTupleContext; - - columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); - - /* create a mapping of shard id to a connection for each of its placements */ - shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); - - /* - * From here on we use copyStatement as the template for the command - * that we send to workers. This command does not have an attribute - * list since NextCopyFrom will generate a value for all columns. - */ - copyStatement->attlist = NIL; - /* set up callback to identify error line number */ errorCallback.callback = CopyFromErrorCallback; errorCallback.arg = (void *) copyState; @@ -436,10 +370,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) while (true) { bool nextRowFound = false; - Datum partitionColumnValue = 0; - ShardInterval *shardInterval = NULL; - int64 shardId = 0; - bool shardConnectionsFound = false; MemoryContext oldContext = NULL; ResetPerTupleExprContext(executorState); @@ -458,103 +388,23 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) CHECK_FOR_INTERRUPTS(); - /* - * Find the partition column value and corresponding shard interval - * for non-reference tables. - * Get the existing (and only a single) shard interval for the reference - * tables. Note that, reference tables has NULL partition column values so - * skip the check. - */ - if (partitionColumn != NULL) - { - if (columnNulls[partitionColumn->varattno - 1]) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot copy row with NULL value " - "in partition column"))); - } - - partitionColumnValue = columnValues[partitionColumn->varattno - 1]; - } - - /* - * Find the shard interval and id for the partition column value for - * non-reference tables. - * For reference table, this function blindly returns the tables single - * shard. - */ - shardInterval = FindShardInterval(partitionColumnValue, - shardIntervalCache, - shardCount, partitionMethod, - compareFunction, hashFunction, - useBinarySearch); - - if (shardInterval == NULL) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find shard for partition column " - "value"))); - } - - shardId = shardInterval->shardId; - MemoryContextSwitchTo(oldContext); - /* get existing connections to the shard placements, if any */ - shardConnections = GetShardHashConnections(shardConnectionHash, shardId, - &shardConnectionsFound); - if (!shardConnectionsFound) - { - bool stopOnFailure = false; - - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) - { - stopOnFailure = true; - } - - /* open connections and initiate COPY on shard placements */ - OpenCopyConnections(copyStatement, shardConnections, stopOnFailure, - copyOutState->binary); - - /* send copy binary headers to shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryHeaders(copyOutState, shardId, - shardConnections->connectionList); - } - } - - /* replicate row to shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); - SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, - shardConnections->connectionList); + dest->receiveSlot(tupleTableSlot, dest); processedRowCount += 1; } + EndCopyFrom(copyState); + /* all lines have been copied, stop showing line number in errors */ error_context_stack = errorCallback.previous; - shardConnectionsList = ShardConnectionList(shardConnectionHash); - foreach(shardConnectionsCell, shardConnectionsList) - { - ShardConnections *shardConnections = (ShardConnections *) lfirst( - shardConnectionsCell); + /* finish the COPY commands */ + dest->rShutdown(dest); - /* send copy binary footers to all shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryFooters(copyOutState, shardConnections->shardId, - shardConnections->connectionList); - } - - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); - } - - EndCopyFrom(copyState); + ExecDropSingleTupleTableSlot(tupleTableSlot); + FreeExecutorState(executorState); heap_close(distributedRelation, NoLock); /* mark failed placements as inactive */ diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 16703c580..f96e53361 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -723,7 +723,7 @@ CREATE TABLE numbers_hash(a int, b int); SELECT create_distributed_table('numbers_hash', 'a'); \c - - - :worker_1_port -ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +ALTER TABLE numbers_hash_560180 DROP COLUMN b; \c - - - :master_port -- operation will fail to modify a shard and roll back @@ -739,7 +739,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); \. -- verify no row is inserted -SELECT * FROM numbers_hash; +SELECT count(a) FROM numbers_hash; -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 9b02124f6..e04c99149 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -978,17 +978,19 @@ SELECT create_distributed_table('numbers_hash', 'a'); (1 row) \c - - - :worker_1_port -ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +ALTER TABLE numbers_hash_560180 DROP COLUMN b; \c - - - :master_port -- operation will fail to modify a shard and roll back COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -ERROR: row field count is 2, expected 3 -DETAIL: (null) +ERROR: column "b" of relation "numbers_hash_560180" does not exist +CONTEXT: while executing command on localhost:57637 +COPY numbers_hash, line 1: "1,1" -- verify no row is inserted -SELECT * FROM numbers_hash; - a | b ----+--- -(0 rows) +SELECT count(a) FROM numbers_hash; + count +------- + 0 +(1 row) -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport