revert local shard changes

m3hm3t/add-freeze-copy
Mehmet YILMAZ 2024-09-10 18:40:24 +00:00
parent 842f5e486f
commit 4db4a1ee9f
1 changed files with 19 additions and 39 deletions

View File

@ -83,7 +83,6 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState); localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static List * CreateCopyOptions(bool isBinaryCopy);
static StringInfo ConstructShardTruncateStatement( static StringInfo ConstructShardTruncateStatement(
List *destinationShardFullyQualifiedName); List *destinationShardFullyQualifiedName);
@ -548,26 +547,6 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
} }
/*
* CreateCopyOptions creates the options list for the COPY command.
*/
static List *
CreateCopyOptions(bool isBinaryCopy)
{
List *options = NIL;
/* If binary format is used, add the binary format option */
if (isBinaryCopy)
{
DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"),
-1);
options = lappend(options, binaryFormatOption);
}
return options;
}
/* /*
* LocalCopyToShard performs local copy for the given destination shard. * LocalCopyToShard performs local copy for the given destination shard.
*/ */
@ -575,7 +554,6 @@ static void
LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState)
{ {
bool isBinaryCopy = localCopyOutState->binary; bool isBinaryCopy = localCopyOutState->binary;
if (isBinaryCopy) if (isBinaryCopy)
{ {
AppendCopyBinaryFooters(localCopyOutState); AppendCopyBinaryFooters(localCopyOutState);
@ -588,38 +566,40 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
*/ */
LocalCopyBuffer = localCopyOutState->fe_msgbuf; LocalCopyBuffer = localCopyOutState->fe_msgbuf;
/* Extract schema and relation names */
char *destinationShardSchemaName = linitial( char *destinationShardSchemaName = linitial(
copyDest->destinationShardFullyQualifiedName); copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond( char *destinationShardRelationName = lsecond(
copyDest->destinationShardFullyQualifiedName); copyDest->destinationShardFullyQualifiedName);
/* Get OIDs for schema and shard */ Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName,
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); false /* missing_ok */);
Oid destinationShardOid = get_relname_relid(destinationShardRelationName, Oid destinationShardOid = get_relname_relid(destinationShardRelationName,
destinationSchemaOid); destinationSchemaOid);
/* Create options list for COPY command */ DefElem *binaryFormatOption = NULL;
List *options = CreateCopyOptions(isBinaryCopy); if (isBinaryCopy)
{
binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1);
}
/* Open the shard relation */
Relation shard = table_open(destinationShardOid, RowExclusiveLock); Relation shard = table_open(destinationShardOid, RowExclusiveLock);
ParseState *pState = make_parsestate(NULL /* parentParseState */);
(void) addRangeTableEntryForRelation(pState, shard, AccessShareLock,
NULL /* alias */, false /* inh */,
false /* inFromCl */);
/* Create and configure parse state */ List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL;
ParseState *pState = make_parsestate(NULL); CopyFromState cstate = BeginCopyFrom(pState, shard,
(void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, NULL /* whereClause */,
false); NULL /* fileName */,
false /* is_program */,
/* Begin and execute the COPY FROM operation */ ReadFromLocalBufferCallback,
CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, NULL /* attlist (NULL is all columns) */,
ReadFromLocalBufferCallback, NULL, options); options);
CopyFrom(cstate); CopyFrom(cstate);
EndCopyFrom(cstate); EndCopyFrom(cstate);
/* Reset the local copy buffer */
resetStringInfo(localCopyOutState->fe_msgbuf); resetStringInfo(localCopyOutState->fe_msgbuf);
/* Close the shard relation and free parse state */
table_close(shard, NoLock); table_close(shard, NoLock);
free_parsestate(pState); free_parsestate(pState);
} }