mirror of https://github.com/citusdata/citus.git
Implement RedistributeTaskListResult
parent
d855faf2b2
commit
527d7d41c1
|
@ -18,6 +18,7 @@
|
||||||
#include "access/tupdesc.h"
|
#include "access/tupdesc.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
@ -29,6 +30,28 @@
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
|
||||||
|
* It is a separate struct to use it as a key in a hash table.
|
||||||
|
*/
|
||||||
|
typedef struct NodePair
|
||||||
|
{
|
||||||
|
int sourceNodeId;
|
||||||
|
int targetNodeId;
|
||||||
|
} NodePair;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from
|
||||||
|
* the source node to the destination node in the NodePair.
|
||||||
|
*/
|
||||||
|
typedef struct NodeToNodeFragmentsTransfer
|
||||||
|
{
|
||||||
|
NodePair nodes;
|
||||||
|
List *fragmentList;
|
||||||
|
} NodeToNodeFragmentsTransfer;
|
||||||
|
|
||||||
|
|
||||||
/* forward declarations of local functions */
|
/* forward declarations of local functions */
|
||||||
static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
|
static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
|
||||||
DistTableCacheEntry *targetRelation,
|
DistTableCacheEntry *targetRelation,
|
||||||
|
@ -46,6 +69,44 @@ static DistributedResultFragment * TupleToDistributedResultFragment(
|
||||||
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
||||||
TupleDesc resultDescriptor,
|
TupleDesc resultDescriptor,
|
||||||
bool errorOnAnyFailure);
|
bool errorOnAnyFailure);
|
||||||
|
static List ** ColocateFragmentsWithRelation(List *fragmentList,
|
||||||
|
DistTableCacheEntry *targetRelation);
|
||||||
|
static List * ColocationTransfers(List *fragmentList,
|
||||||
|
DistTableCacheEntry *targetRelation);
|
||||||
|
static List * FragmentTransferTaskList(List *fragmentListTransfers);
|
||||||
|
static char * QueryStringForFragmentsTransfer(
|
||||||
|
NodeToNodeFragmentsTransfer *fragmentsTransfer);
|
||||||
|
static void ExecuteFetchTaskList(List *fetchTaskList);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RedistributeTaskListResults partitions the results of given task list using
|
||||||
|
* shard ranges and partition method of given targetRelation, and then colocates
|
||||||
|
* the result files with shards.
|
||||||
|
*
|
||||||
|
* If a shard has a replication factor > 1, corresponding result files are copied
|
||||||
|
* to all nodes containing that shard.
|
||||||
|
*
|
||||||
|
* returnValue[shardIndex] is list of cstrings each of which is a resultId which
|
||||||
|
* correspond to targetRelation->sortedShardIntervalArray[shardIndex].
|
||||||
|
*/
|
||||||
|
List **
|
||||||
|
RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList,
|
||||||
|
DistTableCacheEntry *targetRelation,
|
||||||
|
bool binaryFormat)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Make sure that this transaction has a distributed transaction ID.
|
||||||
|
*
|
||||||
|
* Intermediate results will be stored in a directory that is derived
|
||||||
|
* from the distributed transaction ID.
|
||||||
|
*/
|
||||||
|
UseCoordinatedTransaction();
|
||||||
|
|
||||||
|
List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList,
|
||||||
|
targetRelation, binaryFormat);
|
||||||
|
return ColocateFragmentsWithRelation(fragmentList, targetRelation);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -64,6 +125,14 @@ PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList,
|
||||||
DistTableCacheEntry *targetRelation,
|
DistTableCacheEntry *targetRelation,
|
||||||
bool binaryFormat)
|
bool binaryFormat)
|
||||||
{
|
{
|
||||||
|
if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH &&
|
||||||
|
targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("repartitioning results of a tasklist is only supported "
|
||||||
|
"when target relation is hash or range partitioned.")));
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that this transaction has a distributed transaction ID.
|
* Make sure that this transaction has a distributed transaction ID.
|
||||||
*
|
*
|
||||||
|
@ -333,3 +402,222 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||||
|
|
||||||
return resultStore;
|
return resultStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColocateFragmentsWithRelation moves the fragments in the cluster so they are
|
||||||
|
* colocated with the shards of target relation. These transfers are done by
|
||||||
|
* calls to fetch_intermediate_results() between nodes.
|
||||||
|
*
|
||||||
|
* returnValue[shardIndex] is list of result Ids that are colocated with
|
||||||
|
* targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are
|
||||||
|
* done.
|
||||||
|
*/
|
||||||
|
static List **
|
||||||
|
ColocateFragmentsWithRelation(List *fragmentList, DistTableCacheEntry *targetRelation)
|
||||||
|
{
|
||||||
|
List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation);
|
||||||
|
List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers);
|
||||||
|
|
||||||
|
ExecuteFetchTaskList(fragmentTransferTaskList);
|
||||||
|
|
||||||
|
int shardCount = targetRelation->shardIntervalArrayLength;
|
||||||
|
List **shardResultIdList = palloc0(shardCount * sizeof(List *));
|
||||||
|
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
foreach(fragmentCell, fragmentList)
|
||||||
|
{
|
||||||
|
DistributedResultFragment *sourceFragment = lfirst(fragmentCell);
|
||||||
|
int shardIndex = sourceFragment->targetShardIndex;
|
||||||
|
|
||||||
|
shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex],
|
||||||
|
sourceFragment->resultId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardResultIdList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColocationTransfers returns a list of transfers to colocate given fragments with
|
||||||
|
* shards of the target relation. These transfers also take into account replicated
|
||||||
|
* target relations. This prunes away transfers with same source and target
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
ColocationTransfers(List *fragmentList, DistTableCacheEntry *targetRelation)
|
||||||
|
{
|
||||||
|
HASHCTL transferHashInfo;
|
||||||
|
MemSet(&transferHashInfo, 0, sizeof(HASHCTL));
|
||||||
|
transferHashInfo.keysize = sizeof(NodePair);
|
||||||
|
transferHashInfo.entrysize = sizeof(NodeToNodeFragmentsTransfer);
|
||||||
|
transferHashInfo.hcxt = CurrentMemoryContext;
|
||||||
|
HTAB *transferHash = hash_create("Fragment Transfer Hash", 32, &transferHashInfo,
|
||||||
|
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||||
|
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
foreach(fragmentCell, fragmentList)
|
||||||
|
{
|
||||||
|
DistributedResultFragment *fragment = lfirst(fragmentCell);
|
||||||
|
List *placementList = FinalizedShardPlacementList(fragment->targetShardId);
|
||||||
|
|
||||||
|
ListCell *placementCell = NULL;
|
||||||
|
foreach(placementCell, placementList)
|
||||||
|
{
|
||||||
|
ShardPlacement *placement = lfirst(placementCell);
|
||||||
|
NodePair transferKey = {
|
||||||
|
.sourceNodeId = fragment->nodeId,
|
||||||
|
.targetNodeId = placement->nodeId
|
||||||
|
};
|
||||||
|
|
||||||
|
if (transferKey.sourceNodeId == transferKey.targetNodeId)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool foundInCache = false;
|
||||||
|
NodeToNodeFragmentsTransfer *fragmentListTransfer =
|
||||||
|
hash_search(transferHash, &transferKey, HASH_ENTER, &foundInCache);
|
||||||
|
if (!foundInCache)
|
||||||
|
{
|
||||||
|
fragmentListTransfer->nodes = transferKey;
|
||||||
|
fragmentListTransfer->fragmentList = NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
fragmentListTransfer->fragmentList =
|
||||||
|
lappend(fragmentListTransfer->fragmentList, fragment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List *fragmentListTransfers = NIL;
|
||||||
|
NodeToNodeFragmentsTransfer *transfer = NULL;
|
||||||
|
HASH_SEQ_STATUS hashSeqStatus;
|
||||||
|
|
||||||
|
hash_seq_init(&hashSeqStatus, transferHash);
|
||||||
|
|
||||||
|
while ((transfer = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
|
{
|
||||||
|
fragmentListTransfers = lappend(fragmentListTransfers, transfer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fragmentListTransfers;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FragmentTransferTaskList returns a list of tasks which performs the given list of
|
||||||
|
* transfers. Each of the transfers are done by a SQL call to fetch_intermediate_results.
|
||||||
|
* See QueryStringForFragmentsTransfer for how the query is constructed.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
FragmentTransferTaskList(List *fragmentListTransfers)
|
||||||
|
{
|
||||||
|
List *fetchTaskList = NIL;
|
||||||
|
ListCell *transferCell = NULL;
|
||||||
|
|
||||||
|
foreach(transferCell, fragmentListTransfers)
|
||||||
|
{
|
||||||
|
NodeToNodeFragmentsTransfer *fragmentsTransfer = lfirst(transferCell);
|
||||||
|
|
||||||
|
int targetNodeId = fragmentsTransfer->nodes.targetNodeId;
|
||||||
|
|
||||||
|
/* these should have already been pruned away in ColocationTransfers */
|
||||||
|
Assert(targetNodeId != fragmentsTransfer->nodes.sourceNodeId);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = LookupNodeByNodeId(targetNodeId);
|
||||||
|
|
||||||
|
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
|
||||||
|
targetPlacement->nodeName = workerNode->workerName;
|
||||||
|
targetPlacement->nodePort = workerNode->workerPort;
|
||||||
|
targetPlacement->groupId = workerNode->groupId;
|
||||||
|
|
||||||
|
Task *task = CitusMakeNode(Task);
|
||||||
|
task->taskType = SELECT_TASK;
|
||||||
|
task->queryString = QueryStringForFragmentsTransfer(fragmentsTransfer);
|
||||||
|
task->taskPlacementList = list_make1(targetPlacement);
|
||||||
|
|
||||||
|
fetchTaskList = lappend(fetchTaskList, task);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fetchTaskList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* QueryStringForFragmentsTransfer returns a query which fetches distributed
|
||||||
|
* result fragments from source node to target node. See the structure of
|
||||||
|
* NodeToNodeFragmentsTransfer for details of how these are decided.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer)
|
||||||
|
{
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
StringInfo queryString = makeStringInfo();
|
||||||
|
StringInfo fragmentNamesArrayString = makeStringInfo();
|
||||||
|
int fragmentCount = 0;
|
||||||
|
NodePair *nodePair = &fragmentsTransfer->nodes;
|
||||||
|
WorkerNode *sourceNode = LookupNodeByNodeId(nodePair->sourceNodeId);
|
||||||
|
|
||||||
|
appendStringInfoString(fragmentNamesArrayString, "ARRAY[");
|
||||||
|
|
||||||
|
foreach(fragmentCell, fragmentsTransfer->fragmentList)
|
||||||
|
{
|
||||||
|
DistributedResultFragment *fragment = lfirst(fragmentCell);
|
||||||
|
char *fragmentName = fragment->resultId;
|
||||||
|
|
||||||
|
if (fragmentCount > 0)
|
||||||
|
{
|
||||||
|
appendStringInfoString(fragmentNamesArrayString, ",");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(fragmentNamesArrayString,
|
||||||
|
quote_literal_cstr(fragmentName));
|
||||||
|
|
||||||
|
fragmentCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(fragmentNamesArrayString, "]::text[]");
|
||||||
|
|
||||||
|
appendStringInfo(queryString,
|
||||||
|
"SELECT bytes FROM fetch_intermediate_results(%s,%s,%d) bytes",
|
||||||
|
fragmentNamesArrayString->data,
|
||||||
|
quote_literal_cstr(sourceNode->workerName),
|
||||||
|
sourceNode->workerPort);
|
||||||
|
|
||||||
|
ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName,
|
||||||
|
sourceNode->workerPort, queryString->data)));
|
||||||
|
|
||||||
|
return queryString->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteFetchTaskList executes a list of fetch_intermediate_results() tasks.
|
||||||
|
* It ignores the byte_count result of the fetch_intermediate_results() calls.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ExecuteFetchTaskList(List *taskList)
|
||||||
|
{
|
||||||
|
TupleDesc resultDescriptor = NULL;
|
||||||
|
Tuplestorestate *resultStore = NULL;
|
||||||
|
int resultColumnCount = 1;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount);
|
||||||
|
#else
|
||||||
|
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
|
||||||
|
|
||||||
|
bool errorOnAnyFailure = true;
|
||||||
|
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor,
|
||||||
|
errorOnAnyFailure);
|
||||||
|
|
||||||
|
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
||||||
|
&TTSOpsMinimalTuple);
|
||||||
|
|
||||||
|
while (tuplestore_gettupleslot(resultStore, true, false, slot))
|
||||||
|
{
|
||||||
|
ExecClearTuple(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -847,6 +847,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
|
||||||
totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId);
|
totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UnclaimConnection(connection);
|
||||||
|
|
||||||
PG_RETURN_INT64(totalBytesWritten);
|
PG_RETURN_INT64(totalBytesWritten);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -234,7 +234,8 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
|
||||||
static bool
|
static bool
|
||||||
CitusIsVolatileFunctionIdChecker(Oid func_id, void *context)
|
CitusIsVolatileFunctionIdChecker(Oid func_id, void *context)
|
||||||
{
|
{
|
||||||
if (func_id == CitusReadIntermediateResultFuncId())
|
if (func_id == CitusReadIntermediateResultFuncId() ||
|
||||||
|
func_id == CitusReadIntermediateResultArrayFuncId())
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -273,7 +274,8 @@ CitusIsVolatileFunction(Node *node)
|
||||||
static bool
|
static bool
|
||||||
CitusIsMutableFunctionIdChecker(Oid func_id, void *context)
|
CitusIsMutableFunctionIdChecker(Oid func_id, void *context)
|
||||||
{
|
{
|
||||||
if (func_id == CitusReadIntermediateResultFuncId())
|
if (func_id == CitusReadIntermediateResultFuncId() ||
|
||||||
|
func_id == CitusReadIntermediateResultArrayFuncId())
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,10 @@ extern char * QueryResultFileName(const char *resultId);
|
||||||
extern char * CreateIntermediateResultsDirectory(void);
|
extern char * CreateIntermediateResultsDirectory(void);
|
||||||
|
|
||||||
/* distributed_intermediate_results.c */
|
/* distributed_intermediate_results.c */
|
||||||
|
extern List ** RedistributeTaskListResults(char *resultIdPrefix,
|
||||||
|
List *selectTaskList,
|
||||||
|
DistTableCacheEntry *targetRelation,
|
||||||
|
bool binaryFormat);
|
||||||
extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList,
|
extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList,
|
||||||
DistTableCacheEntry *distributionScheme,
|
DistTableCacheEntry *distributionScheme,
|
||||||
bool binaryFormat);
|
bool binaryFormat);
|
||||||
|
|
Loading…
Reference in New Issue