mirror of https://github.com/citusdata/citus.git
Merge pull request #3355 from citusdata/redistribute_results
Redistribute task list results to correspond to a target relation's distributionpull/3372/head
commit
c7efbf9711
|
@ -18,6 +18,7 @@
|
||||||
#include "access/tupdesc.h"
|
#include "access/tupdesc.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
@ -29,6 +30,28 @@
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
|
||||||
|
* It is a separate struct to use it as a key in a hash table.
|
||||||
|
*/
|
||||||
|
typedef struct NodePair
|
||||||
|
{
|
||||||
|
int sourceNodeId;
|
||||||
|
int targetNodeId;
|
||||||
|
} NodePair;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from
|
||||||
|
* the source node to the destination node in the NodePair.
|
||||||
|
*/
|
||||||
|
typedef struct NodeToNodeFragmentsTransfer
|
||||||
|
{
|
||||||
|
NodePair nodes;
|
||||||
|
List *fragmentList;
|
||||||
|
} NodeToNodeFragmentsTransfer;
|
||||||
|
|
||||||
|
|
||||||
/* forward declarations of local functions */
|
/* forward declarations of local functions */
|
||||||
static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
|
static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
|
||||||
DistTableCacheEntry *targetRelation,
|
DistTableCacheEntry *targetRelation,
|
||||||
|
@ -46,6 +69,44 @@ static DistributedResultFragment * TupleToDistributedResultFragment(
|
||||||
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
||||||
TupleDesc resultDescriptor,
|
TupleDesc resultDescriptor,
|
||||||
bool errorOnAnyFailure);
|
bool errorOnAnyFailure);
|
||||||
|
static List ** ColocateFragmentsWithRelation(List *fragmentList,
|
||||||
|
DistTableCacheEntry *targetRelation);
|
||||||
|
static List * ColocationTransfers(List *fragmentList,
|
||||||
|
DistTableCacheEntry *targetRelation);
|
||||||
|
static List * FragmentTransferTaskList(List *fragmentListTransfers);
|
||||||
|
static char * QueryStringForFragmentsTransfer(
|
||||||
|
NodeToNodeFragmentsTransfer *fragmentsTransfer);
|
||||||
|
static void ExecuteFetchTaskList(List *fetchTaskList);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RedistributeTaskListResults partitions the results of given task list using
|
||||||
|
* shard ranges and partition method of given targetRelation, and then colocates
|
||||||
|
* the result files with shards.
|
||||||
|
*
|
||||||
|
* If a shard has a replication factor > 1, corresponding result files are copied
|
||||||
|
* to all nodes containing that shard.
|
||||||
|
*
|
||||||
|
* returnValue[shardIndex] is list of cstrings each of which is a resultId which
|
||||||
|
* correspond to targetRelation->sortedShardIntervalArray[shardIndex].
|
||||||
|
*/
|
||||||
|
List **
|
||||||
|
RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList,
|
||||||
|
DistTableCacheEntry *targetRelation,
|
||||||
|
bool binaryFormat)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Make sure that this transaction has a distributed transaction ID.
|
||||||
|
*
|
||||||
|
* Intermediate results will be stored in a directory that is derived
|
||||||
|
* from the distributed transaction ID.
|
||||||
|
*/
|
||||||
|
UseCoordinatedTransaction();
|
||||||
|
|
||||||
|
List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList,
|
||||||
|
targetRelation, binaryFormat);
|
||||||
|
return ColocateFragmentsWithRelation(fragmentList, targetRelation);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -64,6 +125,14 @@ PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList,
|
||||||
DistTableCacheEntry *targetRelation,
|
DistTableCacheEntry *targetRelation,
|
||||||
bool binaryFormat)
|
bool binaryFormat)
|
||||||
{
|
{
|
||||||
|
if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH &&
|
||||||
|
targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("repartitioning results of a tasklist is only supported "
|
||||||
|
"when target relation is hash or range partitioned.")));
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that this transaction has a distributed transaction ID.
|
* Make sure that this transaction has a distributed transaction ID.
|
||||||
*
|
*
|
||||||
|
@ -333,3 +402,222 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||||
|
|
||||||
return resultStore;
|
return resultStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColocateFragmentsWithRelation moves the fragments in the cluster so they are
|
||||||
|
* colocated with the shards of target relation. These transfers are done by
|
||||||
|
* calls to fetch_intermediate_results() between nodes.
|
||||||
|
*
|
||||||
|
* returnValue[shardIndex] is list of result Ids that are colocated with
|
||||||
|
* targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are
|
||||||
|
* done.
|
||||||
|
*/
|
||||||
|
static List **
|
||||||
|
ColocateFragmentsWithRelation(List *fragmentList, DistTableCacheEntry *targetRelation)
|
||||||
|
{
|
||||||
|
List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation);
|
||||||
|
List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers);
|
||||||
|
|
||||||
|
ExecuteFetchTaskList(fragmentTransferTaskList);
|
||||||
|
|
||||||
|
int shardCount = targetRelation->shardIntervalArrayLength;
|
||||||
|
List **shardResultIdList = palloc0(shardCount * sizeof(List *));
|
||||||
|
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
foreach(fragmentCell, fragmentList)
|
||||||
|
{
|
||||||
|
DistributedResultFragment *sourceFragment = lfirst(fragmentCell);
|
||||||
|
int shardIndex = sourceFragment->targetShardIndex;
|
||||||
|
|
||||||
|
shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex],
|
||||||
|
sourceFragment->resultId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardResultIdList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColocationTransfers returns a list of transfers to colocate given fragments with
|
||||||
|
* shards of the target relation. These transfers also take into account replicated
|
||||||
|
* target relations. This prunes away transfers with same source and target
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
ColocationTransfers(List *fragmentList, DistTableCacheEntry *targetRelation)
|
||||||
|
{
|
||||||
|
HASHCTL transferHashInfo;
|
||||||
|
MemSet(&transferHashInfo, 0, sizeof(HASHCTL));
|
||||||
|
transferHashInfo.keysize = sizeof(NodePair);
|
||||||
|
transferHashInfo.entrysize = sizeof(NodeToNodeFragmentsTransfer);
|
||||||
|
transferHashInfo.hcxt = CurrentMemoryContext;
|
||||||
|
HTAB *transferHash = hash_create("Fragment Transfer Hash", 32, &transferHashInfo,
|
||||||
|
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||||
|
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
foreach(fragmentCell, fragmentList)
|
||||||
|
{
|
||||||
|
DistributedResultFragment *fragment = lfirst(fragmentCell);
|
||||||
|
List *placementList = FinalizedShardPlacementList(fragment->targetShardId);
|
||||||
|
|
||||||
|
ListCell *placementCell = NULL;
|
||||||
|
foreach(placementCell, placementList)
|
||||||
|
{
|
||||||
|
ShardPlacement *placement = lfirst(placementCell);
|
||||||
|
NodePair transferKey = {
|
||||||
|
.sourceNodeId = fragment->nodeId,
|
||||||
|
.targetNodeId = placement->nodeId
|
||||||
|
};
|
||||||
|
|
||||||
|
if (transferKey.sourceNodeId == transferKey.targetNodeId)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool foundInCache = false;
|
||||||
|
NodeToNodeFragmentsTransfer *fragmentListTransfer =
|
||||||
|
hash_search(transferHash, &transferKey, HASH_ENTER, &foundInCache);
|
||||||
|
if (!foundInCache)
|
||||||
|
{
|
||||||
|
fragmentListTransfer->nodes = transferKey;
|
||||||
|
fragmentListTransfer->fragmentList = NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
fragmentListTransfer->fragmentList =
|
||||||
|
lappend(fragmentListTransfer->fragmentList, fragment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List *fragmentListTransfers = NIL;
|
||||||
|
NodeToNodeFragmentsTransfer *transfer = NULL;
|
||||||
|
HASH_SEQ_STATUS hashSeqStatus;
|
||||||
|
|
||||||
|
hash_seq_init(&hashSeqStatus, transferHash);
|
||||||
|
|
||||||
|
while ((transfer = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
|
{
|
||||||
|
fragmentListTransfers = lappend(fragmentListTransfers, transfer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fragmentListTransfers;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FragmentTransferTaskList returns a list of tasks which performs the given list of
|
||||||
|
* transfers. Each of the transfers are done by a SQL call to fetch_intermediate_results.
|
||||||
|
* See QueryStringForFragmentsTransfer for how the query is constructed.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
FragmentTransferTaskList(List *fragmentListTransfers)
|
||||||
|
{
|
||||||
|
List *fetchTaskList = NIL;
|
||||||
|
ListCell *transferCell = NULL;
|
||||||
|
|
||||||
|
foreach(transferCell, fragmentListTransfers)
|
||||||
|
{
|
||||||
|
NodeToNodeFragmentsTransfer *fragmentsTransfer = lfirst(transferCell);
|
||||||
|
|
||||||
|
int targetNodeId = fragmentsTransfer->nodes.targetNodeId;
|
||||||
|
|
||||||
|
/* these should have already been pruned away in ColocationTransfers */
|
||||||
|
Assert(targetNodeId != fragmentsTransfer->nodes.sourceNodeId);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = LookupNodeByNodeId(targetNodeId);
|
||||||
|
|
||||||
|
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
|
||||||
|
targetPlacement->nodeName = workerNode->workerName;
|
||||||
|
targetPlacement->nodePort = workerNode->workerPort;
|
||||||
|
targetPlacement->groupId = workerNode->groupId;
|
||||||
|
|
||||||
|
Task *task = CitusMakeNode(Task);
|
||||||
|
task->taskType = SELECT_TASK;
|
||||||
|
task->queryString = QueryStringForFragmentsTransfer(fragmentsTransfer);
|
||||||
|
task->taskPlacementList = list_make1(targetPlacement);
|
||||||
|
|
||||||
|
fetchTaskList = lappend(fetchTaskList, task);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fetchTaskList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* QueryStringForFragmentsTransfer returns a query which fetches distributed
|
||||||
|
* result fragments from source node to target node. See the structure of
|
||||||
|
* NodeToNodeFragmentsTransfer for details of how these are decided.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer)
|
||||||
|
{
|
||||||
|
ListCell *fragmentCell = NULL;
|
||||||
|
StringInfo queryString = makeStringInfo();
|
||||||
|
StringInfo fragmentNamesArrayString = makeStringInfo();
|
||||||
|
int fragmentCount = 0;
|
||||||
|
NodePair *nodePair = &fragmentsTransfer->nodes;
|
||||||
|
WorkerNode *sourceNode = LookupNodeByNodeId(nodePair->sourceNodeId);
|
||||||
|
|
||||||
|
appendStringInfoString(fragmentNamesArrayString, "ARRAY[");
|
||||||
|
|
||||||
|
foreach(fragmentCell, fragmentsTransfer->fragmentList)
|
||||||
|
{
|
||||||
|
DistributedResultFragment *fragment = lfirst(fragmentCell);
|
||||||
|
char *fragmentName = fragment->resultId;
|
||||||
|
|
||||||
|
if (fragmentCount > 0)
|
||||||
|
{
|
||||||
|
appendStringInfoString(fragmentNamesArrayString, ",");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(fragmentNamesArrayString,
|
||||||
|
quote_literal_cstr(fragmentName));
|
||||||
|
|
||||||
|
fragmentCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(fragmentNamesArrayString, "]::text[]");
|
||||||
|
|
||||||
|
appendStringInfo(queryString,
|
||||||
|
"SELECT bytes FROM fetch_intermediate_results(%s,%s,%d) bytes",
|
||||||
|
fragmentNamesArrayString->data,
|
||||||
|
quote_literal_cstr(sourceNode->workerName),
|
||||||
|
sourceNode->workerPort);
|
||||||
|
|
||||||
|
ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName,
|
||||||
|
sourceNode->workerPort, queryString->data)));
|
||||||
|
|
||||||
|
return queryString->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteFetchTaskList executes a list of fetch_intermediate_results() tasks.
|
||||||
|
* It ignores the byte_count result of the fetch_intermediate_results() calls.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ExecuteFetchTaskList(List *taskList)
|
||||||
|
{
|
||||||
|
TupleDesc resultDescriptor = NULL;
|
||||||
|
Tuplestorestate *resultStore = NULL;
|
||||||
|
int resultColumnCount = 1;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount);
|
||||||
|
#else
|
||||||
|
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
|
||||||
|
|
||||||
|
bool errorOnAnyFailure = true;
|
||||||
|
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor,
|
||||||
|
errorOnAnyFailure);
|
||||||
|
|
||||||
|
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
||||||
|
&TTSOpsMinimalTuple);
|
||||||
|
|
||||||
|
while (tuplestore_gettupleslot(resultStore, true, false, slot))
|
||||||
|
{
|
||||||
|
ExecClearTuple(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -847,6 +847,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
|
||||||
totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId);
|
totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UnclaimConnection(connection);
|
||||||
|
|
||||||
PG_RETURN_INT64(totalBytesWritten);
|
PG_RETURN_INT64(totalBytesWritten);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,15 +18,18 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
|
|
||||||
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
#include "tcop/tcopprot.h"
|
#include "tcop/tcopprot.h"
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(partition_task_list_results);
|
PG_FUNCTION_INFO_V1(partition_task_list_results);
|
||||||
|
PG_FUNCTION_INFO_V1(redistribute_task_list_results);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* partition_task_list_results partitions results of each of distributed
|
* partition_task_list_results partitions results of each of distributed
|
||||||
|
@ -89,3 +92,78 @@ partition_task_list_results(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
PG_RETURN_DATUM(0);
|
PG_RETURN_DATUM(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* redistribute_task_list_results exposes RedistributeTaskListResult for testing.
|
||||||
|
* It executes a query and repartitions and colocates its results according to
|
||||||
|
* a relation.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
redistribute_task_list_results(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *resultIdPrefixText = PG_GETARG_TEXT_P(0);
|
||||||
|
char *resultIdPrefix = text_to_cstring(resultIdPrefixText);
|
||||||
|
text *queryText = PG_GETARG_TEXT_P(1);
|
||||||
|
char *queryString = text_to_cstring(queryText);
|
||||||
|
Oid relationId = PG_GETARG_OID(2);
|
||||||
|
bool binaryFormat = PG_GETARG_BOOL(3);
|
||||||
|
|
||||||
|
Query *parsedQuery = ParseQueryString(queryString, NULL, 0);
|
||||||
|
PlannedStmt *queryPlan = pg_plan_query(parsedQuery,
|
||||||
|
CURSOR_OPT_PARALLEL_OK,
|
||||||
|
NULL);
|
||||||
|
if (!IsCitusCustomScan(queryPlan->planTree))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("query must be distributed and shouldn't require "
|
||||||
|
"any merging on the coordinator.")));
|
||||||
|
}
|
||||||
|
|
||||||
|
CustomScan *customScan = (CustomScan *) queryPlan->planTree;
|
||||||
|
DistributedPlan *distributedPlan = GetDistributedPlan(customScan);
|
||||||
|
|
||||||
|
Job *job = distributedPlan->workerJob;
|
||||||
|
List *taskList = job->taskList;
|
||||||
|
|
||||||
|
DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId);
|
||||||
|
List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList,
|
||||||
|
targetRelation, binaryFormat);
|
||||||
|
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||||
|
int shardCount = targetRelation->shardIntervalArrayLength;
|
||||||
|
|
||||||
|
for (int shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval =
|
||||||
|
targetRelation->sortedShardIntervalArray[shardIndex];
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
int fragmentCount = list_length(shardResultIds[shardIndex]);
|
||||||
|
Datum *resultIdValues = palloc0(fragmentCount * sizeof(Datum));
|
||||||
|
List *sortedResultIds = SortList(shardResultIds[shardIndex], pg_qsort_strcmp);
|
||||||
|
|
||||||
|
ListCell *resultIdCell = NULL;
|
||||||
|
int resultIdIndex = 0;
|
||||||
|
foreach(resultIdCell, sortedResultIds)
|
||||||
|
{
|
||||||
|
char *resultId = lfirst(resultIdCell);
|
||||||
|
resultIdValues[resultIdIndex++] = CStringGetTextDatum(resultId);
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrayType *resultIdArray = DatumArrayToArrayType(resultIdValues, fragmentCount,
|
||||||
|
TEXTOID);
|
||||||
|
|
||||||
|
bool columnNulls[2] = { 0 };
|
||||||
|
Datum columnValues[2] = {
|
||||||
|
Int64GetDatum(shardId),
|
||||||
|
PointerGetDatum(resultIdArray)
|
||||||
|
};
|
||||||
|
|
||||||
|
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
|
||||||
|
}
|
||||||
|
|
||||||
|
tuplestore_donestoring(tupleStore);
|
||||||
|
|
||||||
|
PG_RETURN_DATUM(0);
|
||||||
|
}
|
||||||
|
|
|
@ -234,7 +234,8 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
|
||||||
static bool
|
static bool
|
||||||
CitusIsVolatileFunctionIdChecker(Oid func_id, void *context)
|
CitusIsVolatileFunctionIdChecker(Oid func_id, void *context)
|
||||||
{
|
{
|
||||||
if (func_id == CitusReadIntermediateResultFuncId())
|
if (func_id == CitusReadIntermediateResultFuncId() ||
|
||||||
|
func_id == CitusReadIntermediateResultArrayFuncId())
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -273,7 +274,8 @@ CitusIsVolatileFunction(Node *node)
|
||||||
static bool
|
static bool
|
||||||
CitusIsMutableFunctionIdChecker(Oid func_id, void *context)
|
CitusIsMutableFunctionIdChecker(Oid func_id, void *context)
|
||||||
{
|
{
|
||||||
if (func_id == CitusReadIntermediateResultFuncId())
|
if (func_id == CitusReadIntermediateResultFuncId() ||
|
||||||
|
func_id == CitusReadIntermediateResultArrayFuncId())
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,10 @@ extern char * QueryResultFileName(const char *resultId);
|
||||||
extern char * CreateIntermediateResultsDirectory(void);
|
extern char * CreateIntermediateResultsDirectory(void);
|
||||||
|
|
||||||
/* distributed_intermediate_results.c */
|
/* distributed_intermediate_results.c */
|
||||||
|
extern List ** RedistributeTaskListResults(char *resultIdPrefix,
|
||||||
|
List *selectTaskList,
|
||||||
|
DistTableCacheEntry *targetRelation,
|
||||||
|
bool binaryFormat);
|
||||||
extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList,
|
extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList,
|
||||||
DistTableCacheEntry *distributionScheme,
|
DistTableCacheEntry *distributionScheme,
|
||||||
bool binaryFormat);
|
bool binaryFormat);
|
||||||
|
|
|
@ -76,6 +76,9 @@ s/_id_other_column_ref_fkey/_id_fkey/g
|
||||||
# intermediate_results
|
# intermediate_results
|
||||||
s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g
|
s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g
|
||||||
|
|
||||||
|
# toast tables
|
||||||
|
s/pg_toast_[0-9]+/pg_toast_xxxxx/g
|
||||||
|
|
||||||
# Plan numbers are not very stable, so we normalize those
|
# Plan numbers are not very stable, so we normalize those
|
||||||
# subplan numbers are quite stable so we keep those
|
# subplan numbers are quite stable so we keep those
|
||||||
s/DEBUG: Plan [0-9]+/DEBUG: Plan XXX/g
|
s/DEBUG: Plan [0-9]+/DEBUG: Plan XXX/g
|
||||||
|
|
|
@ -2,12 +2,26 @@
|
||||||
CREATE SCHEMA distributed_intermediate_results;
|
CREATE SCHEMA distributed_intermediate_results;
|
||||||
SET search_path TO 'distributed_intermediate_results';
|
SET search_path TO 'distributed_intermediate_results';
|
||||||
SET citus.next_shard_id TO 4213581;
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
-- redistribute_task_list_results test the internal RedistributeTaskListResult
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.redistribute_task_list_results(resultIdPrefix text,
|
||||||
|
query text,
|
||||||
|
target_table regclass,
|
||||||
|
binaryFormat bool DEFAULT true)
|
||||||
|
RETURNS TABLE(shardid bigint,
|
||||||
|
colocated_results text[])
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$redistribute_task_list_results$$;
|
||||||
--
|
--
|
||||||
-- We don't have extensive tests for partition_task_results, since it will be
|
-- We don't have extensive tests for partition_task_results or
|
||||||
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
|
-- redistribute_task_list_results, since they will be tested by higher level
|
||||||
|
-- "INSERT/SELECT with repartitioning" tests anyway.
|
||||||
--
|
--
|
||||||
--
|
--
|
||||||
-- partition_task_list_results, hash partitioning, binary format
|
-- Case 1.
|
||||||
|
-- hash partitioning, binary format
|
||||||
|
-- * partition_task_list_results
|
||||||
|
-- * redistribute_task_list_results
|
||||||
--
|
--
|
||||||
CREATE TABLE source_table(a int);
|
CREATE TABLE source_table(a int);
|
||||||
SET citus.shard_count TO 3;
|
SET citus.shard_count TO 3;
|
||||||
|
@ -26,6 +40,16 @@ SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE colocated_with_target(a int);
|
||||||
|
SELECT create_distributed_table('colocated_with_target', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- one value per shard, so we can route calls to read_intermediate_shards
|
||||||
|
INSERT INTO colocated_with_target VALUES (1), (2);
|
||||||
|
-- partition_task_list_results
|
||||||
-- should error out
|
-- should error out
|
||||||
SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table');
|
SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table');
|
||||||
ERROR: query must be distributed and shouldn't require any merging on the coordinator.
|
ERROR: query must be distributed and shouldn't require any merging on the coordinator.
|
||||||
|
@ -63,10 +87,44 @@ SELECT count(*), sum(x) FROM
|
||||||
100 | 5050
|
100 | 5050
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
END;
|
ROLLBACK;
|
||||||
DROP TABLE source_table, target_table, distributed_result_info;
|
-- redistribute_task_list_results
|
||||||
|
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
|
||||||
|
-- expected colocated results on the same node as each of two shards.
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table');
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY shardid;
|
||||||
|
shardid | colocated_results
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4213584 | {test_from_4213581_to_0,test_from_4213582_to_0}
|
||||||
|
4213585 | {test_from_4213582_to_1,test_from_4213583_to_1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
WITH shard_1 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213581_to_0,test_from_4213582_to_0}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 1
|
||||||
|
), shard_2 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213582_to_1,test_from_4213583_to_1}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 2
|
||||||
|
), all_rows AS (
|
||||||
|
(SELECT * FROM shard_1) UNION (SELECT * FROM shard_2)
|
||||||
|
)
|
||||||
|
SELECT count(*), sum(x) FROM all_rows;
|
||||||
|
count | sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 5050
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
DROP TABLE source_table, target_table, colocated_with_target;
|
||||||
--
|
--
|
||||||
-- partition_task_list_results, range partitioning, text format
|
-- Case 2.
|
||||||
|
-- range partitioning, text format
|
||||||
|
-- * partition_task_list_results
|
||||||
|
-- * redistribute_task_list_results
|
||||||
--
|
--
|
||||||
CREATE TABLE source_table(a int);
|
CREATE TABLE source_table(a int);
|
||||||
SELECT create_distributed_table('source_table', 'a', 'range');
|
SELECT create_distributed_table('source_table', 'a', 'range');
|
||||||
|
@ -89,6 +147,19 @@ SELECT create_distributed_table('target_table', 'a', 'range');
|
||||||
CALL public.create_range_partitioned_shards('target_table',
|
CALL public.create_range_partitioned_shards('target_table',
|
||||||
'{0,25,50,76}',
|
'{0,25,50,76}',
|
||||||
'{24,49,75,200}');
|
'{24,49,75,200}');
|
||||||
|
CREATE TABLE colocated_with_target(a int);
|
||||||
|
SELECT create_distributed_table('colocated_with_target', 'a', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('colocated_with_target',
|
||||||
|
'{0,25,50,76}',
|
||||||
|
'{24,49,75,200}');
|
||||||
|
-- one value per shard, so we can route calls to read_intermediate_shards
|
||||||
|
INSERT INTO colocated_with_target VALUES (1), (26), (51), (77);
|
||||||
|
-- partition_task_list_results
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE TABLE distributed_result_info AS
|
CREATE TABLE distributed_result_info AS
|
||||||
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
|
@ -98,22 +169,22 @@ CREATE TABLE distributed_result_info AS
|
||||||
SELECT * FROM distributed_result_info ORDER BY resultId;
|
SELECT * FROM distributed_result_info ORDER BY resultId;
|
||||||
resultid | nodeport | rowcount | targetshardid | targetshardindex
|
resultid | nodeport | rowcount | targetshardid | targetshardindex
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
test_from_4213586_to_0 | 57638 | 7 | 4213590 | 0
|
test_from_4213588_to_0 | 57638 | 7 | 4213592 | 0
|
||||||
test_from_4213586_to_1 | 57638 | 6 | 4213591 | 1
|
test_from_4213588_to_1 | 57638 | 6 | 4213593 | 1
|
||||||
test_from_4213586_to_2 | 57638 | 7 | 4213592 | 2
|
test_from_4213588_to_2 | 57638 | 7 | 4213594 | 2
|
||||||
test_from_4213586_to_3 | 57638 | 4 | 4213593 | 3
|
test_from_4213588_to_3 | 57638 | 4 | 4213595 | 3
|
||||||
test_from_4213587_to_0 | 57637 | 7 | 4213590 | 0
|
test_from_4213589_to_0 | 57637 | 7 | 4213592 | 0
|
||||||
test_from_4213587_to_1 | 57637 | 6 | 4213591 | 1
|
test_from_4213589_to_1 | 57637 | 6 | 4213593 | 1
|
||||||
test_from_4213587_to_2 | 57637 | 8 | 4213592 | 2
|
test_from_4213589_to_2 | 57637 | 8 | 4213594 | 2
|
||||||
test_from_4213587_to_3 | 57637 | 4 | 4213593 | 3
|
test_from_4213589_to_3 | 57637 | 4 | 4213595 | 3
|
||||||
test_from_4213588_to_0 | 57638 | 8 | 4213590 | 0
|
test_from_4213590_to_0 | 57638 | 8 | 4213592 | 0
|
||||||
test_from_4213588_to_1 | 57638 | 6 | 4213591 | 1
|
test_from_4213590_to_1 | 57638 | 6 | 4213593 | 1
|
||||||
test_from_4213588_to_2 | 57638 | 8 | 4213592 | 2
|
test_from_4213590_to_2 | 57638 | 8 | 4213594 | 2
|
||||||
test_from_4213588_to_3 | 57638 | 4 | 4213593 | 3
|
test_from_4213590_to_3 | 57638 | 4 | 4213595 | 3
|
||||||
test_from_4213589_to_0 | 57637 | 8 | 4213590 | 0
|
test_from_4213591_to_0 | 57637 | 8 | 4213592 | 0
|
||||||
test_from_4213589_to_1 | 57637 | 6 | 4213591 | 1
|
test_from_4213591_to_1 | 57637 | 6 | 4213593 | 1
|
||||||
test_from_4213589_to_2 | 57637 | 7 | 4213592 | 2
|
test_from_4213591_to_2 | 57637 | 7 | 4213594 | 2
|
||||||
test_from_4213589_to_3 | 57637 | 4 | 4213593 | 3
|
test_from_4213591_to_3 | 57637 | 4 | 4213595 | 3
|
||||||
(16 rows)
|
(16 rows)
|
||||||
|
|
||||||
-- fetch from workers
|
-- fetch from workers
|
||||||
|
@ -135,10 +206,190 @@ SELECT count(*), sum(x) FROM
|
||||||
100 | 4550
|
100 | 4550
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- redistribute_task_list_results
|
||||||
|
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
|
||||||
|
-- expected colocated results on the same node as each of two shards.
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, 'target_table');
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY shardid;
|
||||||
|
shardid | colocated_results
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4213592 | {test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}
|
||||||
|
4213593 | {test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}
|
||||||
|
4213594 | {test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}
|
||||||
|
4213595 | {test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
WITH shard_1 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 1
|
||||||
|
), shard_2 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 26
|
||||||
|
), shard_3 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 51
|
||||||
|
), shard_4 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 77
|
||||||
|
), all_rows AS (
|
||||||
|
(SELECT * FROM shard_1) UNION ALL (SELECT * FROM shard_2) UNION ALL
|
||||||
|
(SELECT * FROM shard_3) UNION ALL (SELECT * FROM shard_4)
|
||||||
|
)
|
||||||
|
SELECT count(*), sum(x) FROM all_rows;
|
||||||
|
count | sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 4550
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
DROP TABLE source_table, target_table, colocated_with_target;
|
||||||
|
--
|
||||||
|
-- Case 3.
|
||||||
|
-- range partitioning, text format, replication factor 2 (both source and destination)
|
||||||
|
-- composite distribution column
|
||||||
|
--
|
||||||
|
-- only redistribute_task_list_results
|
||||||
|
--
|
||||||
|
CREATE TYPE composite_key_type AS (f1 int, f2 text);
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
-- source
|
||||||
|
CREATE TABLE source_table(key composite_key_type, value int, mapped_key composite_key_type);
|
||||||
|
SELECT create_distributed_table('source_table', 'key', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
|
||||||
|
INSERT INTO source_table VALUES ((0, 'a'), 1, (0, 'a')); -- shard xxxxx -> shard xxxxx
|
||||||
|
INSERT INTO source_table VALUES ((1, 'b'), 2, (26, 'b')); -- shard xxxxx -> shard xxxxx
|
||||||
|
INSERT INTO source_table VALUES ((2, 'c'), 3, (3, 'c')); -- shard xxxxx -> shard xxxxx
|
||||||
|
INSERT INTO source_table VALUES ((4, 'd'), 4, (27, 'd')); -- shard xxxxx -> shard xxxxx
|
||||||
|
INSERT INTO source_table VALUES ((30, 'e'), 5, (30, 'e')); -- shard xxxxx -> shard xxxxx
|
||||||
|
INSERT INTO source_table VALUES ((31, 'f'), 6, (31, 'f')); -- shard xxxxx -> shard xxxxx
|
||||||
|
INSERT INTO source_table VALUES ((32, 'g'), 7, (8, 'g')); -- shard xxxxx -> shard xxxxx
|
||||||
|
-- target
|
||||||
|
CREATE TABLE target_table(key composite_key_type, value int);
|
||||||
|
SELECT create_distributed_table('target_table', 'key', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
|
||||||
|
-- colocated with target, used for routing calls to read_intermediate_results
|
||||||
|
CREATE TABLE colocated_with_target(key composite_key_type, value_sum int);
|
||||||
|
SELECT create_distributed_table('colocated_with_target', 'key', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('colocated_with_target', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
|
||||||
|
-- one value per shard, so we can route calls to read_intermediate_shards
|
||||||
|
INSERT INTO colocated_with_target VALUES ((0,'a'), 0);
|
||||||
|
INSERT INTO colocated_with_target VALUES ((25, 'a'), 0);
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT mapped_key, value FROM source_table $$, 'target_table');
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY shardid;
|
||||||
|
shardid | colocated_results
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4213602 | {test_from_4213600_to_0,test_from_4213601_to_0}
|
||||||
|
4213603 | {test_from_4213600_to_1,test_from_4213601_to_1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_0,test_from_4213601_to_0}'::text[], 'binary') AS res (x composite_key_type, y int))
|
||||||
|
WHERE key=(0,'a')::composite_key_type;
|
||||||
|
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_1,test_from_4213601_to_1}'::text[], 'binary') AS res (x composite_key_type, y int))
|
||||||
|
WHERE key=(25,'a')::composite_key_type;
|
||||||
|
SELECT * FROM colocated_with_target ORDER BY key;
|
||||||
|
key | value_sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0,a) | 11
|
||||||
|
(25,a) | 17
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
END;
|
END;
|
||||||
DROP TABLE source_table, target_table, distributed_result_info;
|
-- verify that replicas of colocated_with_target are consistent (i.e. copies
|
||||||
|
-- of result files in both nodes were same when calling read_intermediate_results()
|
||||||
|
-- in the above UPDATE calls).
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
|
||||||
|
key | value_sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0,a) | 11
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
|
||||||
|
key | value_sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(25,a) | 17
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
|
||||||
|
key | value_sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0,a) | 11
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
|
||||||
|
key | value_sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(25,a) | 17
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO 'distributed_intermediate_results';
|
||||||
|
DROP TABLE source_table, target_table, colocated_with_target, distributed_result_info;
|
||||||
|
DROP TYPE composite_key_type;
|
||||||
|
--
|
||||||
|
-- Case 4. target relation is a reference table or an append partitioned table
|
||||||
|
--
|
||||||
|
CREATE TABLE source_table(a int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table SELECT * FROM generate_series(1, 100);
|
||||||
|
CREATE TABLE target_table_reference(a int);
|
||||||
|
SELECT create_reference_table('target_table_reference');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE target_table_append(a int);
|
||||||
|
SELECT create_distributed_table('target_table_append', 'a', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_reference');
|
||||||
|
ERROR: repartitioning results of a tasklist is only supported when target relation is hash or range partitioned.
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_append');
|
||||||
|
ERROR: repartitioning results of a tasklist is only supported when target relation is hash or range partitioned.
|
||||||
|
ROLLBACK;
|
||||||
|
-- clean-up
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA distributed_intermediate_results CASCADE;
|
DROP SCHEMA distributed_intermediate_results CASCADE;
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
SET citus.shard_count TO DEFAULT;
|
SET citus.shard_count TO DEFAULT;
|
||||||
|
SET citus.shard_replication_factor TO DEFAULT;
|
||||||
|
|
|
@ -3,14 +3,29 @@ CREATE SCHEMA distributed_intermediate_results;
|
||||||
SET search_path TO 'distributed_intermediate_results';
|
SET search_path TO 'distributed_intermediate_results';
|
||||||
|
|
||||||
SET citus.next_shard_id TO 4213581;
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
-- redistribute_task_list_results test the internal RedistributeTaskListResult
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.redistribute_task_list_results(resultIdPrefix text,
|
||||||
|
query text,
|
||||||
|
target_table regclass,
|
||||||
|
binaryFormat bool DEFAULT true)
|
||||||
|
RETURNS TABLE(shardid bigint,
|
||||||
|
colocated_results text[])
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$redistribute_task_list_results$$;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- We don't have extensive tests for partition_task_results, since it will be
|
-- We don't have extensive tests for partition_task_results or
|
||||||
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
|
-- redistribute_task_list_results, since they will be tested by higher level
|
||||||
|
-- "INSERT/SELECT with repartitioning" tests anyway.
|
||||||
--
|
--
|
||||||
|
|
||||||
--
|
--
|
||||||
-- partition_task_list_results, hash partitioning, binary format
|
-- Case 1.
|
||||||
|
-- hash partitioning, binary format
|
||||||
|
-- * partition_task_list_results
|
||||||
|
-- * redistribute_task_list_results
|
||||||
--
|
--
|
||||||
|
|
||||||
CREATE TABLE source_table(a int);
|
CREATE TABLE source_table(a int);
|
||||||
|
@ -22,6 +37,13 @@ CREATE TABLE target_table(a int);
|
||||||
SET citus.shard_count TO 2;
|
SET citus.shard_count TO 2;
|
||||||
SELECT create_distributed_table('target_table', 'a');
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
|
CREATE TABLE colocated_with_target(a int);
|
||||||
|
SELECT create_distributed_table('colocated_with_target', 'a');
|
||||||
|
-- one value per shard, so we can route calls to read_intermediate_shards
|
||||||
|
INSERT INTO colocated_with_target VALUES (1), (2);
|
||||||
|
|
||||||
|
-- partition_task_list_results
|
||||||
|
|
||||||
-- should error out
|
-- should error out
|
||||||
SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table');
|
SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table');
|
||||||
SELECT partition_task_list_results('test', $$ SELECT * FROM generate_series(1, 2) $$, 'target_table');
|
SELECT partition_task_list_results('test', $$ SELECT * FROM generate_series(1, 2) $$, 'target_table');
|
||||||
|
@ -39,12 +61,36 @@ SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost',
|
||||||
SELECT count(*), sum(x) FROM
|
SELECT count(*), sum(x) FROM
|
||||||
read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info),
|
read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info),
|
||||||
'binary') AS res (x int);
|
'binary') AS res (x int);
|
||||||
END;
|
ROLLBACK;
|
||||||
|
|
||||||
DROP TABLE source_table, target_table, distributed_result_info;
|
-- redistribute_task_list_results
|
||||||
|
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
|
||||||
|
-- expected colocated results on the same node as each of two shards.
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table');
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY shardid;
|
||||||
|
WITH shard_1 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213581_to_0,test_from_4213582_to_0}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 1
|
||||||
|
), shard_2 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213582_to_1,test_from_4213583_to_1}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 2
|
||||||
|
), all_rows AS (
|
||||||
|
(SELECT * FROM shard_1) UNION (SELECT * FROM shard_2)
|
||||||
|
)
|
||||||
|
SELECT count(*), sum(x) FROM all_rows;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
DROP TABLE source_table, target_table, colocated_with_target;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- partition_task_list_results, range partitioning, text format
|
-- Case 2.
|
||||||
|
-- range partitioning, text format
|
||||||
|
-- * partition_task_list_results
|
||||||
|
-- * redistribute_task_list_results
|
||||||
--
|
--
|
||||||
CREATE TABLE source_table(a int);
|
CREATE TABLE source_table(a int);
|
||||||
SELECT create_distributed_table('source_table', 'a', 'range');
|
SELECT create_distributed_table('source_table', 'a', 'range');
|
||||||
|
@ -58,7 +104,15 @@ SELECT create_distributed_table('target_table', 'a', 'range');
|
||||||
CALL public.create_range_partitioned_shards('target_table',
|
CALL public.create_range_partitioned_shards('target_table',
|
||||||
'{0,25,50,76}',
|
'{0,25,50,76}',
|
||||||
'{24,49,75,200}');
|
'{24,49,75,200}');
|
||||||
|
CREATE TABLE colocated_with_target(a int);
|
||||||
|
SELECT create_distributed_table('colocated_with_target', 'a', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('colocated_with_target',
|
||||||
|
'{0,25,50,76}',
|
||||||
|
'{24,49,75,200}');
|
||||||
|
-- one value per shard, so we can route calls to read_intermediate_shards
|
||||||
|
INSERT INTO colocated_with_target VALUES (1), (26), (51), (77);
|
||||||
|
|
||||||
|
-- partition_task_list_results
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE TABLE distributed_result_info AS
|
CREATE TABLE distributed_result_info AS
|
||||||
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
|
@ -74,9 +128,135 @@ SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost',
|
||||||
SELECT count(*), sum(x) FROM
|
SELECT count(*), sum(x) FROM
|
||||||
read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info),
|
read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info),
|
||||||
'text') AS res (x int);
|
'text') AS res (x int);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- redistribute_task_list_results
|
||||||
|
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
|
||||||
|
-- expected colocated results on the same node as each of two shards.
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, 'target_table');
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY shardid;
|
||||||
|
|
||||||
|
WITH shard_1 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 1
|
||||||
|
), shard_2 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 26
|
||||||
|
), shard_3 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 51
|
||||||
|
), shard_4 AS (
|
||||||
|
SELECT t.* FROM colocated_with_target, (
|
||||||
|
SELECT * FROM read_intermediate_results('{test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}'::text[], 'binary') AS res (x int)) t
|
||||||
|
WHERE colocated_with_target.a = 77
|
||||||
|
), all_rows AS (
|
||||||
|
(SELECT * FROM shard_1) UNION ALL (SELECT * FROM shard_2) UNION ALL
|
||||||
|
(SELECT * FROM shard_3) UNION ALL (SELECT * FROM shard_4)
|
||||||
|
)
|
||||||
|
SELECT count(*), sum(x) FROM all_rows;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
DROP TABLE source_table, target_table, colocated_with_target;
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Case 3.
|
||||||
|
-- range partitioning, text format, replication factor 2 (both source and destination)
|
||||||
|
-- composite distribution column
|
||||||
|
--
|
||||||
|
-- only redistribute_task_list_results
|
||||||
|
--
|
||||||
|
CREATE TYPE composite_key_type AS (f1 int, f2 text);
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
|
||||||
|
-- source
|
||||||
|
CREATE TABLE source_table(key composite_key_type, value int, mapped_key composite_key_type);
|
||||||
|
SELECT create_distributed_table('source_table', 'key', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
|
||||||
|
|
||||||
|
INSERT INTO source_table VALUES ((0, 'a'), 1, (0, 'a')); -- shard 1 -> shard 1
|
||||||
|
INSERT INTO source_table VALUES ((1, 'b'), 2, (26, 'b')); -- shard 1 -> shard 2
|
||||||
|
INSERT INTO source_table VALUES ((2, 'c'), 3, (3, 'c')); -- shard 1 -> shard 1
|
||||||
|
INSERT INTO source_table VALUES ((4, 'd'), 4, (27, 'd')); -- shard 1 -> shard 2
|
||||||
|
INSERT INTO source_table VALUES ((30, 'e'), 5, (30, 'e')); -- shard 2 -> shard 2
|
||||||
|
INSERT INTO source_table VALUES ((31, 'f'), 6, (31, 'f')); -- shard 2 -> shard 2
|
||||||
|
INSERT INTO source_table VALUES ((32, 'g'), 7, (8, 'g')); -- shard 2 -> shard 1
|
||||||
|
|
||||||
|
-- target
|
||||||
|
CREATE TABLE target_table(key composite_key_type, value int);
|
||||||
|
SELECT create_distributed_table('target_table', 'key', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
|
||||||
|
|
||||||
|
-- colocated with target, used for routing calls to read_intermediate_results
|
||||||
|
CREATE TABLE colocated_with_target(key composite_key_type, value_sum int);
|
||||||
|
SELECT create_distributed_table('colocated_with_target', 'key', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('colocated_with_target', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
|
||||||
|
-- one value per shard, so we can route calls to read_intermediate_shards
|
||||||
|
INSERT INTO colocated_with_target VALUES ((0,'a'), 0);
|
||||||
|
INSERT INTO colocated_with_target VALUES ((25, 'a'), 0);
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT mapped_key, value FROM source_table $$, 'target_table');
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY shardid;
|
||||||
|
|
||||||
|
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_0,test_from_4213601_to_0}'::text[], 'binary') AS res (x composite_key_type, y int))
|
||||||
|
WHERE key=(0,'a')::composite_key_type;
|
||||||
|
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_1,test_from_4213601_to_1}'::text[], 'binary') AS res (x composite_key_type, y int))
|
||||||
|
WHERE key=(25,'a')::composite_key_type;
|
||||||
|
|
||||||
|
SELECT * FROM colocated_with_target ORDER BY key;
|
||||||
|
|
||||||
END;
|
END;
|
||||||
|
|
||||||
DROP TABLE source_table, target_table, distributed_result_info;
|
-- verify that replicas of colocated_with_target are consistent (i.e. copies
|
||||||
|
-- of result files in both nodes were same when calling read_intermediate_results()
|
||||||
|
-- in the above UPDATE calls).
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
|
||||||
|
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SET search_path TO 'distributed_intermediate_results';
|
||||||
|
DROP TABLE source_table, target_table, colocated_with_target, distributed_result_info;
|
||||||
|
DROP TYPE composite_key_type;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Case 4. target relation is a reference table or an append partitioned table
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE source_table(a int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
INSERT INTO source_table SELECT * FROM generate_series(1, 100);
|
||||||
|
|
||||||
|
CREATE TABLE target_table_reference(a int);
|
||||||
|
SELECT create_reference_table('target_table_reference');
|
||||||
|
|
||||||
|
CREATE TABLE target_table_append(a int);
|
||||||
|
SELECT create_distributed_table('target_table_append', 'a', 'append');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_reference');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_append');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- clean-up
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA distributed_intermediate_results CASCADE;
|
DROP SCHEMA distributed_intermediate_results CASCADE;
|
||||||
|
@ -84,3 +264,4 @@ DROP SCHEMA distributed_intermediate_results CASCADE;
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
SET citus.shard_count TO DEFAULT;
|
SET citus.shard_count TO DEFAULT;
|
||||||
|
SET citus.shard_replication_factor TO DEFAULT;
|
||||||
|
|
Loading…
Reference in New Issue