m3hm3t/add-freeze-copy
Mehmet YILMAZ 2024-09-09 21:18:34 +00:00
parent d32e7263ae
commit d13ecedcfa
1 changed files with 77 additions and 69 deletions

View File

@ -83,8 +83,9 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState); localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static List *CreateCopyOptions(bool isBinaryCopy); static List * CreateCopyOptions(bool isBinaryCopy);
static StringInfo ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName); static StringInfo ConstructShardTruncateStatement(
List *destinationShardFullyQualifiedName);
static bool static bool
@ -114,8 +115,9 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
SetupReplicationOriginRemoteSession(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection);
// Construct and send the TRUNCATE statement to the remote node /* Construct and send the TRUNCATE statement to the remote node */
StringInfo truncateStatement = ConstructShardTruncateStatement(copyDest->destinationShardFullyQualifiedName); StringInfo truncateStatement = ConstructShardTruncateStatement(
copyDest->destinationShardFullyQualifiedName);
if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{ {
@ -127,9 +129,9 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
{ {
ReportResultError(copyDest->connection, truncateResult, ERROR); ReportResultError(copyDest->connection, truncateResult, ERROR);
} }
PQclear(truncateResult); PQclear(truncateResult);
// Construct the COPY command and send it to the remote node /* Construct the COPY command and send it to the remote node */
StringInfo copyStatement = ConstructShardCopyStatement( StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName, copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary, copyDest->copyOutState->binary,
@ -478,31 +480,31 @@ ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName)
*/ */
static StringInfo static StringInfo
ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat, useBinaryFormat,
TupleDesc tupleDesc) TupleDesc tupleDesc)
{ {
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN",
quote_identifier(destinationShardSchemaName), quote_identifier(destinationShardSchemaName),
quote_identifier(destinationShardRelationName), quote_identifier(destinationShardRelationName),
columnList); columnList);
if (useBinaryFormat) if (useBinaryFormat)
{ {
appendStringInfo(command, " WITH (format binary, FREEZE);"); appendStringInfo(command, " WITH (format binary, FREEZE);");
} }
else else
{ {
appendStringInfo(command, " WITH (FREEZE);"); appendStringInfo(command, " WITH (FREEZE);");
} }
return command; return command;
} }
@ -542,20 +544,21 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
static List * static List *
CreateCopyOptions(bool isBinaryCopy) CreateCopyOptions(bool isBinaryCopy)
{ {
List *options = NIL; List *options = NIL;
// Add the FREEZE option /* Add the FREEZE option */
DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1);
options = lappend(options, freezeOption); options = lappend(options, freezeOption);
// If binary format is used, add the binary format option /* If binary format is used, add the binary format option */
if (isBinaryCopy) if (isBinaryCopy)
{ {
DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"),
options = lappend(options, binaryFormatOption); -1);
} options = lappend(options, binaryFormatOption);
}
return options; return options;
} }
@ -565,49 +568,54 @@ CreateCopyOptions(bool isBinaryCopy)
static void static void
LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState)
{ {
bool isBinaryCopy = localCopyOutState->binary; bool isBinaryCopy = localCopyOutState->binary;
if (isBinaryCopy) if (isBinaryCopy)
{ {
AppendCopyBinaryFooters(localCopyOutState); AppendCopyBinaryFooters(localCopyOutState);
} }
/* /*
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback * Set the buffer as a global variable to allow ReadFromLocalBufferCallback
* to read from it. We cannot pass additional arguments to * to read from it. We cannot pass additional arguments to
* ReadFromLocalBufferCallback. * ReadFromLocalBufferCallback.
*/ */
LocalCopyBuffer = localCopyOutState->fe_msgbuf; LocalCopyBuffer = localCopyOutState->fe_msgbuf;
// Extract schema and relation names /* Extract schema and relation names */
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); char *destinationShardSchemaName = linitial(
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(
copyDest->destinationShardFullyQualifiedName);
// Get OIDs for schema and shard /* Get OIDs for schema and shard */
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false);
Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); Oid destinationShardOid = get_relname_relid(destinationShardRelationName,
destinationSchemaOid);
// Create options list for COPY command /* Create options list for COPY command */
List *options = CreateCopyOptions(isBinaryCopy); List *options = CreateCopyOptions(isBinaryCopy);
// Open the shard relation /* Open the shard relation */
Relation shard = table_open(destinationShardOid, RowExclusiveLock); Relation shard = table_open(destinationShardOid, RowExclusiveLock);
// Create and configure parse state /* Create and configure parse state */
ParseState *pState = make_parsestate(NULL); ParseState *pState = make_parsestate(NULL);
(void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false,
false);
// Begin and execute the COPY FROM operation /* Begin and execute the COPY FROM operation */
CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false,
CopyFrom(cstate); ReadFromLocalBufferCallback, NULL, options);
EndCopyFrom(cstate); CopyFrom(cstate);
EndCopyFrom(cstate);
// Reset the local copy buffer /* Reset the local copy buffer */
resetStringInfo(localCopyOutState->fe_msgbuf); resetStringInfo(localCopyOutState->fe_msgbuf);
// Close the shard relation and free parse state /* Close the shard relation and free parse state */
table_close(shard, NoLock); table_close(shard, NoLock);
free_parsestate(pState); free_parsestate(pState);
} }