diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f98dd83a7..68fc0b848 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1627,6 +1627,10 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS } +/* + * CitusCopyDestReceiverStartup implements the rStartup interface of + * CitusCopyDestReceiver. It opens the relation + */ static void CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) @@ -1646,7 +1650,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, ListCell *columnNameCell = NULL; char partitionMethod = '\0'; - Var *partitionColumn = NULL; + Var *partitionColumn = PartitionColumn(tableId, 0); int partitionColumnIndex = -1; DistTableCacheEntry *cacheEntry = NULL; @@ -1664,17 +1668,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, partitionMethod = cacheEntry->partitionMethod; copyDest->distributedRelation = distributedRelation; - copyDest->partitionMethod = partitionMethod; + copyDest->tupleDescriptor = inputTupleDescriptor; + /* we don't support copy to reference tables from workers */ if (partitionMethod == DISTRIBUTE_BY_NONE) { - /* we don't support copy to reference tables from workers */ EnsureCoordinator(); } - else - { - partitionColumn = PartitionColumn(tableId, 0); - } /* load the list of shards and verify that we have shards to copy into */ shardIntervalList = LoadShardIntervalList(tableId); @@ -1698,10 +1698,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } } - /* prevent concurrent placement changes and non-commutative DML statements */ - LockShardListMetadata(shardIntervalList, ShareLock); - LockShardListResources(shardIntervalList, ShareLock); - /* error if any shard missing min/max values */ if (partitionMethod != DISTRIBUTE_BY_NONE && cacheEntry->hasUninitializedShardInterval) @@ -1713,12 +1709,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, relationName))); } - copyDest->hashFunction = cacheEntry->hashFunction; - copyDest->compareFunction = cacheEntry->shardIntervalCompareFunction; + /* prevent concurrent placement changes and non-commutative DML statements */ + LockShardListMetadata(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); - /* initialize the shard interval cache */ - copyDest->shardCount = cacheEntry->shardIntervalArrayLength; - copyDest->shardIntervalCache = cacheEntry->sortedShardIntervalArray; + /* keep the table metadata to avoid looking it up for every tuple */ + copyDest->tableMetadata = cacheEntry; /* determine whether to use binary search */ if (partitionMethod != DISTRIBUTE_BY_HASH || !cacheEntry->hasUniformHashDistribution) @@ -1726,6 +1722,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->useBinarySearch = true; } + if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) + { + CoordinatedTransactionUse2PC(); + } + /* define how tuples will be serialised */ copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); copyOutState->delim = (char *) delimiterCharacter; @@ -1736,12 +1737,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); copyDest->copyOutState = copyOutState; - copyDest->tupleDescriptor = inputTupleDescriptor; - /* prepare output functions */ copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + /* find the partition column index in the column list */ foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); @@ -1780,10 +1780,15 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyStatement->options = NIL; copyDest->copyStatement = copyStatement; - copyDest->copyConnectionHash = CreateShardConnectionHash(TopTransactionContext); + copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); } +/* + * CitusCopyDestReceiverReceive implements the receiveSlot function of + * CitusCopyDestReceiver. It takes a TupleTableSlot and sends the contents to + * the appropriate shard placement(s). + */ #if PG_VERSION_NUM >= 90600 static bool #else @@ -1793,19 +1798,20 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; - char partitionMethod = copyDest->partitionMethod; + DistTableCacheEntry *tableMetadata = copyDest->tableMetadata; + char partitionMethod = tableMetadata->partitionMethod; int partitionColumnIndex = copyDest->partitionColumnIndex; TupleDesc tupleDescriptor = copyDest->tupleDescriptor; CopyStmt *copyStatement = copyDest->copyStatement; - int shardCount = copyDest->shardCount; - ShardInterval **shardIntervalCache = copyDest->shardIntervalCache; + int shardCount = tableMetadata->shardIntervalArrayLength; + ShardInterval **shardIntervalCache = tableMetadata->sortedShardIntervalArray; bool useBinarySearch = copyDest->useBinarySearch; - FmgrInfo *hashFunction = copyDest->hashFunction; - FmgrInfo *compareFunction = copyDest->compareFunction; + FmgrInfo *hashFunction = tableMetadata->hashFunction; + FmgrInfo *compareFunction = tableMetadata->shardIntervalCompareFunction; - HTAB *copyConnectionHash = copyDest->copyConnectionHash; + HTAB *shardConnectionHash = copyDest->shardConnectionHash; CopyOutState copyOutState = copyDest->copyOutState; FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; @@ -1881,7 +1887,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContextSwitchTo(copyDest->memoryContext); /* get existing connections to the shard placements, if any */ - shardConnections = GetShardHashConnections(copyConnectionHash, shardId, + shardConnections = GetShardHashConnections(shardConnectionHash, shardId, &shardConnectionsFound); if (!shardConnectionsFound) { @@ -1911,12 +1917,17 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } +/* + * CitusCopyDestReceiverShutdown implements the rShutdown interface of + * CitusCopyDestReceiver. It ends the COPY on all the open connections and closes + * the relation. + */ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver; - HTAB *shardConnectionHash = copyDest->copyConnectionHash; + HTAB *shardConnectionHash = copyDest->shardConnectionHash; List *shardConnectionsList = NIL; ListCell *shardConnectionsCell = NULL; CopyOutState copyOutState = copyDest->copyOutState; diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 7b1c6dd04..cd3b56b73 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -14,6 +14,7 @@ #include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "tcop/dest.h" @@ -50,11 +51,23 @@ typedef struct NodeAddress /* CopyDestReceiver can be used to stream results into a distributed table */ typedef struct CitusCopyDestReceiver { + /* public DestReceiver interface */ DestReceiver pub; /* relation and columns to which to copy */ Oid distributedRelationId; List *columnNameList; + int partitionColumnIndex; + + /* distributed table metadata */ + DistTableCacheEntry *tableMetadata; + bool useBinarySearch; + + /* open relation handle */ + Relation distributedRelation; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; /* EState for per-tuple memory allocation */ EState *executorState; @@ -62,26 +75,11 @@ typedef struct CitusCopyDestReceiver /* MemoryContext for DestReceiver session */ MemoryContext memoryContext; - /* distributed relation details */ - Relation distributedRelation; - char partitionMethod; - int partitionColumnIndex; - - /* descriptor of the tuples that are sent to the worker */ - TupleDesc tupleDescriptor; - /* template for COPY statement to send to workers */ CopyStmt *copyStatement; /* cached shard metadata for pruning */ - int shardCount; - ShardInterval **shardIntervalCache; - bool useBinarySearch; - FmgrInfo *hashFunction; - FmgrInfo *compareFunction; - - /* cached shard metadata for pruning */ - HTAB *copyConnectionHash; + HTAB *shardConnectionHash; bool stopOnFailure; /* state on how to copy out data types */