Refactor CopyLocalDataIntoShards (#3693)

This PR:
- Declares variables when they are needed.
- Creates DoCopyFromLocalTableIntoShards for better readability.
- Doesn't use a hardcoded value, instead use a variable for better
readability.
pull/3731/head
SaitTalhaNisanci 2020-04-10 09:25:26 +03:00 committed by GitHub
parent d99043fe0c
commit 07f9a442b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 50 additions and 40 deletions

View File

@ -75,6 +75,10 @@
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/inval.h" #include "utils/inval.h"
/*
* once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
*/
#define LOG_PER_TUPLE_AMOUNT 1000000
/* Replication model to use when creating distributed tables */ /* Replication model to use when creating distributed tables */
int ReplicationModel = REPLICATION_MODEL_COORDINATOR; int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
@ -107,6 +111,10 @@ static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
Var *distributionColumn); Var *distributionColumn);
static bool RelationUsesHeapAccessMethodOrNone(Relation relation); static bool RelationUsesHeapAccessMethodOrNone(Relation relation);
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -1205,26 +1213,8 @@ RegularTable(Oid relationId)
static void static void
CopyLocalDataIntoShards(Oid distributedRelationId) CopyLocalDataIntoShards(Oid distributedRelationId)
{ {
DestReceiver *copyDest = NULL;
List *columnNameList = NIL;
Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL;
Var *partitionColumn = NULL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
bool stopOnFailure = true;
EState *estate = NULL;
#if PG_VERSION_NUM >= PG_VERSION_12
TableScanDesc scan = NULL;
#else
HeapScanDesc scan = NULL;
#endif
HeapTuple tuple = NULL;
MemoryContext oldContext = NULL;
uint64 rowsCopied = 0;
/* take an ExclusiveLock to block all operations except SELECT */ /* take an ExclusiveLock to block all operations except SELECT */
distributedRelation = heap_open(distributedRelationId, ExclusiveLock); Relation distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
/* /*
* Skip copying from partitioned tables, we will copy the data from * Skip copying from partitioned tables, we will copy the data from
@ -1249,24 +1239,27 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
PushActiveSnapshot(GetLatestSnapshot()); PushActiveSnapshot(GetLatestSnapshot());
/* get the table columns */ /* get the table columns */
tupleDescriptor = RelationGetDescr(distributedRelation); TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDescriptor, TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDescriptor,
&TTSOpsHeapTuple); &TTSOpsHeapTuple);
columnNameList = TupleDescColumnNameList(tupleDescriptor); List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
/* determine the partition column in the tuple descriptor */ /* determine the partition column in the tuple descriptor */
partitionColumn = PartitionColumn(distributedRelationId, 0); Var *partitionColumn = PartitionColumn(distributedRelationId, 0);
if (partitionColumn != NULL) if (partitionColumn != NULL)
{ {
partitionColumnIndex = partitionColumn->varattno - 1; partitionColumnIndex = partitionColumn->varattno - 1;
} }
/* initialise per-tuple memory context */ /* initialise per-tuple memory context */
estate = CreateExecutorState(); EState *estate = CreateExecutorState();
ExprContext *econtext = GetPerTupleExprContext(estate); ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot; econtext->ecxt_scantuple = slot;
copyDest = bool stopOnFailure = true;
DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList, columnNameList,
partitionColumnIndex, partitionColumnIndex,
@ -1276,15 +1269,43 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
/* initialise state for writing to shards, we'll open connections on demand */ /* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor); copyDest->rStartup(copyDest, 0, tupleDescriptor);
DoCopyFromLocalTableIntoShards(distributedRelation, copyDest, slot, estate);
/* finish writing into the shards */
copyDest->rShutdown(copyDest);
copyDest->rDestroy(copyDest);
/* free memory and close the relation */
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
heap_close(distributedRelation, NoLock);
PopActiveSnapshot();
}
/*
* DoCopyFromLocalTableIntoShards performs a copy operation
* from local tables into shards.
*/
static void
DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate)
{
/* begin reading from local table */ /* begin reading from local table */
#if PG_VERSION_NUM >= PG_VERSION_12 #if PG_VERSION_NUM >= PG_VERSION_12
scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); TableScanDesc scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0,
NULL);
#else #else
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); HeapScanDesc scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
#endif #endif
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
uint64 rowsCopied = 0;
HeapTuple tuple = NULL;
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{ {
/* materialize tuple and send it to a shard */ /* materialize tuple and send it to a shard */
@ -1308,13 +1329,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
rowsCopied++; rowsCopied++;
if (rowsCopied % 1000000 == 0) if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0)
{ {
ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied))); ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
} }
} }
if (rowsCopied % 1000000 != 0) if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0)
{ {
ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied))); ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
} }
@ -1327,17 +1348,6 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
#else #else
heap_endscan(scan); heap_endscan(scan);
#endif #endif
/* finish writing into the shards */
copyDest->rShutdown(copyDest);
copyDest->rDestroy(copyDest);
/* free memory and close the relation */
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
heap_close(distributedRelation, NoLock);
PopActiveSnapshot();
} }