Hello World Local Copy works

pull/6029/head
Nitish Upreti 2022-06-20 11:53:22 -07:00
parent 58facee439
commit 91ca1fb0b4
4 changed files with 29 additions and 24 deletions

View File

@ -38,7 +38,7 @@ typedef struct ShardCopyDestReceiver
DestReceiver pub;
/* Destination Relation Name */
char *destinationShardFullyQualifiedName;
List *destinationShardFullyQualifiedName;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
@ -71,7 +71,7 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation,
static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint64 destinationNodeId);
static StringInfo ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool
static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static bool ShouldSendCopyNow(StringInfo buffer);
@ -156,9 +156,12 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data,
copyOutState->fe_msgbuf->len))
{
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to shard %s,",
copyDest->destinationShardFullyQualifiedName),
errmsg("Failed to COPY to shard %s.%s,",
destinationShardSchemaName,destinationShardRelationName ),
errdetail("failed to send %d bytes %s on node %u",
copyOutState->fe_msgbuf->len,
copyOutState->fe_msgbuf->data,
@ -223,9 +226,12 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* end the COPY input */
if (!PutRemoteCopyEnd(copyDest->connection, NULL /* errormsg */))
{
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to destination shard %s",
copyDest->destinationShardFullyQualifiedName)));
errmsg("Failed to COPY to destination shard %s.%s",
destinationShardSchemaName, destinationShardRelationName)));
}
/* check whether there were any COPY errors */
@ -245,7 +251,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
DestReceiver *
CreateShardCopyDestReceiver(EState *executorState,
char *destinationShardFullyQualifiedName,
List *destinationShardFullyQualifiedName,
uint32_t destinationNodeId)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0(
@ -293,11 +299,14 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
* for copying into a result table
*/
static StringInfo
ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryFormat)
ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat)
{
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
StringInfo command = makeStringInfo();
appendStringInfo(command, "COPY %s FROM STDIN",
destinationShardFullyQualifiedName);
appendStringInfo(command, "COPY %s.%s FROM STDIN",
destinationShardSchemaName, destinationShardRelationName);
if (useBinaryFormat)
{
@ -360,10 +369,8 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
*/
LocalCopyBuffer = localCopyOutState->fe_msgbuf;
char *destinationShardSchemaName = NULL;
char *destinationShardRelationName = NULL;
DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName),
&destinationShardSchemaName, &destinationShardRelationName);
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName,
false /* missing_ok */);

View File

@ -83,16 +83,12 @@ CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy,
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(
splitCopyDest->
sourceShardRelationOid));
char *destinationShardNameCopy = strdup(sourceShardNamePrefix);
char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix);
AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId);
char *destinationShardFullyQualifiedName =
quote_qualified_identifier(destinationShardSchemaName,
destinationShardNameCopy);
DestReceiver *shardCopyDest = CreateShardCopyDestReceiver(
executorState,
destinationShardFullyQualifiedName,
list_make2(destinationShardSchemaName, destinationShardNameCopy),
splitCopyInfo->destinationShardNodeId);
shardCopyDests[index] = shardCopyDest;

View File

@ -55,11 +55,13 @@ worker_split_copy(PG_FUNCTION_ARGS)
shardIdToSplitCopy,
splitCopyInfoList);
char *sourceShardToCopyName = generate_qualified_relation_name(
shardIntervalToSplitCopy->relationId);
AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy);
StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;",
generate_qualified_relation_name(
shardIntervalToSplitCopy->relationId));
"SELECT * FROM %s;", sourceShardToCopyName);
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,

View File

@ -15,7 +15,7 @@
struct FullRelationName;
extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState,
char *destinationShardFullyQualifiedName,
List *destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
#endif /* WORKER_SHARD_COPY_H_ */