Use CitusCopyDestReceiver for regular COPY

pull/1117/head
Marco Slot 2017-01-25 14:14:17 +01:00
parent d11eca7d4a
commit d74fb764b1
3 changed files with 58 additions and 206 deletions

View File

@ -287,49 +287,31 @@ static void
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
{ {
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char *relationName = get_rel_name(tableId);
CitusCopyDestReceiver *copyDest = NULL;
DestReceiver *dest = NULL;
Relation distributedRelation = NULL; Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
uint32 columnCount = 0; uint32 columnCount = 0;
Datum *columnValues = NULL; Datum *columnValues = NULL;
bool *columnNulls = NULL; bool *columnNulls = NULL;
FmgrInfo *hashFunction = NULL; int columnIndex = 0;
FmgrInfo *compareFunction = NULL; List *columnNameList = NIL;
bool hasUniformHashDistribution = false; TupleTableSlot *tupleTableSlot = NULL;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
int shardCount = 0;
List *shardIntervalList = NULL;
ShardInterval **shardIntervalCache = NULL;
bool useBinarySearch = false;
HTAB *shardConnectionHash = NULL;
ShardConnections *shardConnections = NULL;
List *shardConnectionsList = NIL;
ListCell *shardConnectionsCell = NULL;
EState *executorState = NULL; EState *executorState = NULL;
MemoryContext executorTupleContext = NULL; MemoryContext executorTupleContext = NULL;
ExprContext *executorExpressionContext = NULL; ExprContext *executorExpressionContext = NULL;
char partitionMethod = 0;
bool stopOnFailure = false;
CopyState copyState = NULL; CopyState copyState = NULL;
CopyOutState copyOutState = NULL;
FmgrInfo *columnOutputFunctions = NULL;
uint64 processedRowCount = 0; uint64 processedRowCount = 0;
Var *partitionColumn = PartitionColumn(tableId, 0);
char partitionMethod = PartitionMethod(tableId);
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
/* get hash function for partition column */
hashFunction = cacheEntry->hashFunction;
/* get compare function for shard intervals */
compareFunction = cacheEntry->shardIntervalCompareFunction;
/* allocate column values and nulls arrays */ /* allocate column values and nulls arrays */
distributedRelation = heap_open(tableId, RowExclusiveLock); distributedRelation = heap_open(tableId, RowExclusiveLock);
tupleDescriptor = RelationGetDescr(distributedRelation); tupleDescriptor = RelationGetDescr(distributedRelation);
@ -337,64 +319,40 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
columnValues = palloc0(columnCount * sizeof(Datum)); columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool)); columnNulls = palloc0(columnCount * sizeof(bool));
/* we don't support copy to reference tables from workers */ /* set up a virtual tuple table slot */
tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor);
tupleTableSlot->tts_nvalid = columnCount;
tupleTableSlot->tts_values = columnValues;
tupleTableSlot->tts_isnull = columnNulls;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
char *columnName = NameStr(currentColumn->attname);
if (currentColumn->attisdropped)
{
continue;
}
columnNameList = lappend(columnNameList, columnName);
}
executorState = CreateExecutorState();
executorTupleContext = GetPerTupleMemoryContext(executorState);
executorExpressionContext = GetPerTupleExprContext(executorState);
partitionMethod = PartitionMethod(tableId);
if (partitionMethod == DISTRIBUTE_BY_NONE) if (partitionMethod == DISTRIBUTE_BY_NONE)
{ {
EnsureCoordinator(); stopOnFailure = true;
} }
/* load the list of shards and verify that we have shards to copy into */ /* set up the destination for the COPY */
shardIntervalList = LoadShardIntervalList(tableId); copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState,
if (shardIntervalList == NIL) stopOnFailure);
{ dest = (DestReceiver *) copyDest;
if (partitionMethod == DISTRIBUTE_BY_HASH) dest->rStartup(dest, 0, tupleDescriptor);
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName),
errhint("Run master_create_worker_shards to create shards "
"and try again.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName)));
}
}
/* error if any shard missing min/max values for non reference tables */
if (partitionMethod != DISTRIBUTE_BY_NONE &&
cacheEntry->hasUninitializedShardInterval)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not start copy"),
errdetail("Distributed relation \"%s\" has shards "
"with missing shardminvalue/shardmaxvalue.",
relationName)));
}
/* prevent concurrent placement changes and non-commutative DML statements */
LockShardListMetadata(shardIntervalList, ShareLock);
LockShardListResources(shardIntervalList, ShareLock);
/* initialize the shard interval cache */
shardCount = cacheEntry->shardIntervalArrayLength;
shardIntervalCache = cacheEntry->sortedShardIntervalArray;
hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution;
/* determine whether to use binary search */
if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution)
{
useBinarySearch = true;
}
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* initialize copy state to read from COPY data source */ /* initialize copy state to read from COPY data source */
copyState = BeginCopyFrom(distributedRelation, copyState = BeginCopyFrom(distributedRelation,
@ -403,30 +361,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
copyStatement->attlist, copyStatement->attlist,
copyStatement->options); copyStatement->options);
executorState = CreateExecutorState();
executorTupleContext = GetPerTupleMemoryContext(executorState);
executorExpressionContext = GetPerTupleExprContext(executorState);
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* create a mapping of shard id to a connection for each of its placements */
shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
/*
* From here on we use copyStatement as the template for the command
* that we send to workers. This command does not have an attribute
* list since NextCopyFrom will generate a value for all columns.
*/
copyStatement->attlist = NIL;
/* set up callback to identify error line number */ /* set up callback to identify error line number */
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
errorCallback.arg = (void *) copyState; errorCallback.arg = (void *) copyState;
@ -436,10 +370,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
while (true) while (true)
{ {
bool nextRowFound = false; bool nextRowFound = false;
Datum partitionColumnValue = 0;
ShardInterval *shardInterval = NULL;
int64 shardId = 0;
bool shardConnectionsFound = false;
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
ResetPerTupleExprContext(executorState); ResetPerTupleExprContext(executorState);
@ -458,103 +388,23 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/*
* Find the partition column value and corresponding shard interval
* for non-reference tables.
* Get the existing (and only a single) shard interval for the reference
* tables. Note that, reference tables has NULL partition column values so
* skip the check.
*/
if (partitionColumn != NULL)
{
if (columnNulls[partitionColumn->varattno - 1])
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot copy row with NULL value "
"in partition column")));
}
partitionColumnValue = columnValues[partitionColumn->varattno - 1];
}
/*
* Find the shard interval and id for the partition column value for
* non-reference tables.
* For reference table, this function blindly returns the tables single
* shard.
*/
shardInterval = FindShardInterval(partitionColumnValue,
shardIntervalCache,
shardCount, partitionMethod,
compareFunction, hashFunction,
useBinarySearch);
if (shardInterval == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find shard for partition column "
"value")));
}
shardId = shardInterval->shardId;
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
/* get existing connections to the shard placements, if any */ dest->receiveSlot(tupleTableSlot, dest);
shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
&shardConnectionsFound);
if (!shardConnectionsFound)
{
bool stopOnFailure = false;
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
stopOnFailure = true;
}
/* open connections and initiate COPY on shard placements */
OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
copyOutState->binary);
/* send copy binary headers to shard placements */
if (copyOutState->binary)
{
SendCopyBinaryHeaders(copyOutState, shardId,
shardConnections->connectionList);
}
}
/* replicate row to shard placements */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardId,
shardConnections->connectionList);
processedRowCount += 1; processedRowCount += 1;
} }
EndCopyFrom(copyState);
/* all lines have been copied, stop showing line number in errors */ /* all lines have been copied, stop showing line number in errors */
error_context_stack = errorCallback.previous; error_context_stack = errorCallback.previous;
shardConnectionsList = ShardConnectionList(shardConnectionHash); /* finish the COPY commands */
foreach(shardConnectionsCell, shardConnectionsList) dest->rShutdown(dest);
{
ShardConnections *shardConnections = (ShardConnections *) lfirst(
shardConnectionsCell);
/* send copy binary footers to all shard placements */ ExecDropSingleTupleTableSlot(tupleTableSlot);
if (copyOutState->binary) FreeExecutorState(executorState);
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
}
EndCopyFrom(copyState);
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* mark failed placements as inactive */ /* mark failed placements as inactive */

View File

@ -723,7 +723,7 @@ CREATE TABLE numbers_hash(a int, b int);
SELECT create_distributed_table('numbers_hash', 'a'); SELECT create_distributed_table('numbers_hash', 'a');
\c - - - :worker_1_port \c - - - :worker_1_port
ALTER TABLE numbers_hash_560180 ADD COLUMN c int; ALTER TABLE numbers_hash_560180 DROP COLUMN b;
\c - - - :master_port \c - - - :master_port
-- operation will fail to modify a shard and roll back -- operation will fail to modify a shard and roll back
@ -739,7 +739,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
\. \.
-- verify no row is inserted -- verify no row is inserted
SELECT * FROM numbers_hash; SELECT count(a) FROM numbers_hash;
-- verify shard is still marked as valid -- verify shard is still marked as valid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport

View File

@ -978,17 +978,19 @@ SELECT create_distributed_table('numbers_hash', 'a');
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
ALTER TABLE numbers_hash_560180 ADD COLUMN c int; ALTER TABLE numbers_hash_560180 DROP COLUMN b;
\c - - - :master_port \c - - - :master_port
-- operation will fail to modify a shard and roll back -- operation will fail to modify a shard and roll back
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
ERROR: row field count is 2, expected 3 ERROR: column "b" of relation "numbers_hash_560180" does not exist
DETAIL: (null) CONTEXT: while executing command on localhost:57637
COPY numbers_hash, line 1: "1,1"
-- verify no row is inserted -- verify no row is inserted
SELECT * FROM numbers_hash; SELECT count(a) FROM numbers_hash;
a | b count
---+--- -------
(0 rows) 0
(1 row)
-- verify shard is still marked as valid -- verify shard is still marked as valid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport