Address review feedback in COPY refactoring

pull/1117/head
Marco Slot 2017-02-28 17:23:56 +01:00
parent d74fb764b1
commit db98c28354
2 changed files with 52 additions and 43 deletions

View File

@ -1627,6 +1627,10 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS
}
/*
* CitusCopyDestReceiverStartup implements the rStartup interface of
* CitusCopyDestReceiver. It opens the relation
*/
static void
CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor)
@ -1646,7 +1650,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
ListCell *columnNameCell = NULL;
char partitionMethod = '\0';
Var *partitionColumn = NULL;
Var *partitionColumn = PartitionColumn(tableId, 0);
int partitionColumnIndex = -1;
DistTableCacheEntry *cacheEntry = NULL;
@ -1664,17 +1668,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
partitionMethod = cacheEntry->partitionMethod;
copyDest->distributedRelation = distributedRelation;
copyDest->partitionMethod = partitionMethod;
copyDest->tupleDescriptor = inputTupleDescriptor;
/* we don't support copy to reference tables from workers */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
/* we don't support copy to reference tables from workers */
EnsureCoordinator();
}
else
{
partitionColumn = PartitionColumn(tableId, 0);
}
/* load the list of shards and verify that we have shards to copy into */
shardIntervalList = LoadShardIntervalList(tableId);
@ -1698,10 +1698,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}
}
/* prevent concurrent placement changes and non-commutative DML statements */
LockShardListMetadata(shardIntervalList, ShareLock);
LockShardListResources(shardIntervalList, ShareLock);
/* error if any shard missing min/max values */
if (partitionMethod != DISTRIBUTE_BY_NONE &&
cacheEntry->hasUninitializedShardInterval)
@ -1713,12 +1709,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
relationName)));
}
copyDest->hashFunction = cacheEntry->hashFunction;
copyDest->compareFunction = cacheEntry->shardIntervalCompareFunction;
/* prevent concurrent placement changes and non-commutative DML statements */
LockShardListMetadata(shardIntervalList, ShareLock);
LockShardListResources(shardIntervalList, ShareLock);
/* initialize the shard interval cache */
copyDest->shardCount = cacheEntry->shardIntervalArrayLength;
copyDest->shardIntervalCache = cacheEntry->sortedShardIntervalArray;
/* keep the table metadata to avoid looking it up for every tuple */
copyDest->tableMetadata = cacheEntry;
/* determine whether to use binary search */
if (partitionMethod != DISTRIBUTE_BY_HASH || !cacheEntry->hasUniformHashDistribution)
@ -1726,6 +1722,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->useBinarySearch = true;
}
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* define how tuples will be serialised */
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
@ -1736,12 +1737,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
copyDest->copyOutState = copyOutState;
copyDest->tupleDescriptor = inputTupleDescriptor;
/* prepare output functions */
copyDest->columnOutputFunctions =
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
/* find the partition column index in the column list */
foreach(columnNameCell, columnNameList)
{
char *columnName = (char *) lfirst(columnNameCell);
@ -1780,10 +1780,15 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyStatement->options = NIL;
copyDest->copyStatement = copyStatement;
copyDest->copyConnectionHash = CreateShardConnectionHash(TopTransactionContext);
copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
}
/*
* CitusCopyDestReceiverReceive implements the receiveSlot function of
* CitusCopyDestReceiver. It takes a TupleTableSlot and sends the contents to
* the appropriate shard placement(s).
*/
#if PG_VERSION_NUM >= 90600
static bool
#else
@ -1793,19 +1798,20 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
char partitionMethod = copyDest->partitionMethod;
DistTableCacheEntry *tableMetadata = copyDest->tableMetadata;
char partitionMethod = tableMetadata->partitionMethod;
int partitionColumnIndex = copyDest->partitionColumnIndex;
TupleDesc tupleDescriptor = copyDest->tupleDescriptor;
CopyStmt *copyStatement = copyDest->copyStatement;
int shardCount = copyDest->shardCount;
ShardInterval **shardIntervalCache = copyDest->shardIntervalCache;
int shardCount = tableMetadata->shardIntervalArrayLength;
ShardInterval **shardIntervalCache = tableMetadata->sortedShardIntervalArray;
bool useBinarySearch = copyDest->useBinarySearch;
FmgrInfo *hashFunction = copyDest->hashFunction;
FmgrInfo *compareFunction = copyDest->compareFunction;
FmgrInfo *hashFunction = tableMetadata->hashFunction;
FmgrInfo *compareFunction = tableMetadata->shardIntervalCompareFunction;
HTAB *copyConnectionHash = copyDest->copyConnectionHash;
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
CopyOutState copyOutState = copyDest->copyOutState;
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
@ -1881,7 +1887,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
MemoryContextSwitchTo(copyDest->memoryContext);
/* get existing connections to the shard placements, if any */
shardConnections = GetShardHashConnections(copyConnectionHash, shardId,
shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
&shardConnectionsFound);
if (!shardConnectionsFound)
{
@ -1911,12 +1917,17 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
}
/*
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
* CitusCopyDestReceiver. It ends the COPY on all the open connections and closes
* the relation.
*/
static void
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver;
HTAB *shardConnectionHash = copyDest->copyConnectionHash;
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
List *shardConnectionsList = NIL;
ListCell *shardConnectionsCell = NULL;
CopyOutState copyOutState = copyDest->copyOutState;

View File

@ -14,6 +14,7 @@
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
@ -50,11 +51,23 @@ typedef struct NodeAddress
/* CopyDestReceiver can be used to stream results into a distributed table */
typedef struct CitusCopyDestReceiver
{
/* public DestReceiver interface */
DestReceiver pub;
/* relation and columns to which to copy */
Oid distributedRelationId;
List *columnNameList;
int partitionColumnIndex;
/* distributed table metadata */
DistTableCacheEntry *tableMetadata;
bool useBinarySearch;
/* open relation handle */
Relation distributedRelation;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
/* EState for per-tuple memory allocation */
EState *executorState;
@ -62,26 +75,11 @@ typedef struct CitusCopyDestReceiver
/* MemoryContext for DestReceiver session */
MemoryContext memoryContext;
/* distributed relation details */
Relation distributedRelation;
char partitionMethod;
int partitionColumnIndex;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
/* template for COPY statement to send to workers */
CopyStmt *copyStatement;
/* cached shard metadata for pruning */
int shardCount;
ShardInterval **shardIntervalCache;
bool useBinarySearch;
FmgrInfo *hashFunction;
FmgrInfo *compareFunction;
/* cached shard metadata for pruning */
HTAB *copyConnectionHash;
HTAB *shardConnectionHash;
bool stopOnFailure;
/* state on how to copy out data types */