mirror of https://github.com/citusdata/citus.git
Address some of the PR feedback from Andres
parent
a326205bc2
commit
69ce21a42f
|
@ -315,10 +315,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
||||||
columnValues,columnNulls, NULL);
|
columnValues,columnNulls, NULL);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
if (!nextRowFound)
|
if (!nextRowFound)
|
||||||
{
|
{
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,9 +347,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
|
|
||||||
shardId = shardInterval->shardId;
|
shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
/* find the connections to the shard placements */
|
/* find the connections to the shard placements */
|
||||||
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
|
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
|
||||||
&shardInterval->shardId,
|
&shardId,
|
||||||
HASH_ENTER,
|
HASH_ENTER,
|
||||||
&found);
|
&found);
|
||||||
if (!found)
|
if (!found)
|
||||||
|
@ -394,17 +395,10 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
PrepareTransactions(connectionList);
|
PrepareTransactions(connectionList);
|
||||||
}
|
}
|
||||||
|
|
||||||
pfree(columnValues);
|
|
||||||
pfree(columnNulls);
|
|
||||||
|
|
||||||
FreeExecutorState(executorState);
|
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
EndCopyFrom(copyState);
|
|
||||||
|
|
||||||
/* roll back all transactions */
|
/* roll back all transactions */
|
||||||
connectionList = ConnectionList(shardConnectionHash);
|
connectionList = ConnectionList(shardConnectionHash);
|
||||||
EndRemoteCopy(connectionList, false);
|
EndRemoteCopy(connectionList, false);
|
||||||
|
@ -749,7 +743,7 @@ SendCopyDataToPlacements(StringInfo dataBuffer, ShardConnections *shardConnectio
|
||||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
errmsg("Failed to COPY to shard %ld on %s:%s",
|
errmsg("failed to COPY to shard %ld on %s:%s",
|
||||||
shardId, nodeName, nodePort)));
|
shardId, nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -817,7 +811,7 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure)
|
||||||
if (stopOnFailure)
|
if (stopOnFailure)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
errmsg("Failed to COPY to shard %ld on %s:%s",
|
errmsg("failed to COPY to shard %ld on %s:%s",
|
||||||
shardId, nodeName, nodePort)));
|
shardId, nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -723,8 +723,8 @@ ParseWorkerNodeFile(const char *workerNodeFilename)
|
||||||
/* allocate worker node structure and set fields */
|
/* allocate worker node structure and set fields */
|
||||||
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
|
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
|
||||||
|
|
||||||
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH + 1);
|
strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
|
||||||
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH + 1);
|
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
|
||||||
workerNode->workerPort = nodePort;
|
workerNode->workerPort = nodePort;
|
||||||
workerNode->inWorkerFile = true;
|
workerNode->inWorkerFile = true;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue