diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 3a1539cbf..ab20aed5d 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -38,7 +38,7 @@ typedef struct ShardCopyDestReceiver DestReceiver pub; /* Destination Relation Name */ - char *destinationShardFullyQualifiedName; + List *destinationShardFullyQualifiedName; /* descriptor of the tuples that are sent to the worker */ TupleDesc tupleDescriptor; @@ -71,7 +71,7 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint64 destinationNodeId); -static StringInfo ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool +static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static bool ShouldSendCopyNow(StringInfo buffer); @@ -156,9 +156,12 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len)) { + char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to COPY to shard %s,", - copyDest->destinationShardFullyQualifiedName), + errmsg("Failed to COPY to shard %s.%s,", + destinationShardSchemaName,destinationShardRelationName ), errdetail("failed to send %d bytes %s on node %u", copyOutState->fe_msgbuf->len, copyOutState->fe_msgbuf->data, @@ -223,9 +226,12 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* end the COPY input */ if (!PutRemoteCopyEnd(copyDest->connection, NULL /* errormsg */)) { + char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to COPY to destination shard %s", - copyDest->destinationShardFullyQualifiedName))); + errmsg("Failed to COPY to destination shard %s.%s", + destinationShardSchemaName, destinationShardRelationName))); } /* check whether there were any COPY errors */ @@ -245,7 +251,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) DestReceiver * CreateShardCopyDestReceiver(EState *executorState, - char *destinationShardFullyQualifiedName, + List *destinationShardFullyQualifiedName, uint32_t destinationNodeId) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( @@ -293,11 +299,14 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) * for copying into a result table */ static StringInfo -ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryFormat) +ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat) { + char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + StringInfo command = makeStringInfo(); - appendStringInfo(command, "COPY %s FROM STDIN", - destinationShardFullyQualifiedName); + appendStringInfo(command, "COPY %s.%s FROM STDIN", + destinationShardSchemaName, destinationShardRelationName); if (useBinaryFormat) { @@ -360,10 +369,8 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState */ LocalCopyBuffer = localCopyOutState->fe_msgbuf; - char *destinationShardSchemaName = NULL; - char *destinationShardRelationName = NULL; - DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName), - &destinationShardSchemaName, &destinationShardRelationName); + char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false /* missing_ok */); diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c index 7c22fa485..0dfc43c43 100644 --- a/src/backend/distributed/operations/worker_split_copy.c +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -83,16 +83,12 @@ CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, char *destinationShardSchemaName = get_namespace_name(get_rel_namespace( splitCopyDest-> sourceShardRelationOid)); - char *destinationShardNameCopy = strdup(sourceShardNamePrefix); + char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix); AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); - char *destinationShardFullyQualifiedName = - quote_qualified_identifier(destinationShardSchemaName, - destinationShardNameCopy); - DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( executorState, - destinationShardFullyQualifiedName, + list_make2(destinationShardSchemaName, destinationShardNameCopy), splitCopyInfo->destinationShardNodeId); shardCopyDests[index] = shardCopyDest; diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 0193a2827..e4419ed39 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -55,11 +55,13 @@ worker_split_copy(PG_FUNCTION_ARGS) shardIdToSplitCopy, splitCopyInfoList); + char *sourceShardToCopyName = generate_qualified_relation_name( + shardIntervalToSplitCopy->relationId); + AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy); + StringInfo selectShardQueryForCopy = makeStringInfo(); appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", - generate_qualified_relation_name( - shardIntervalToSplitCopy->relationId)); + "SELECT * FROM %s;", sourceShardToCopyName); ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index 7c4a68f30..212ca6122 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -15,7 +15,7 @@ struct FullRelationName; extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState, - char *destinationShardFullyQualifiedName, + List *destinationShardFullyQualifiedName, uint32_t destinationNodeId); #endif /* WORKER_SHARD_COPY_H_ */