diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 9ff53cf37..1f04751bb 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -819,6 +819,9 @@ AdaptiveExecutor(CitusScanState *scanState) bool hasDependentJobs = HasDependentJobs(job); if (hasDependentJobs) { + /* jobs use intermediate results, which require a distributed transaction */ + UseCoordinatedTransaction(); + jobIdList = ExecuteDependentTasks(taskList, job); } @@ -828,9 +831,10 @@ AdaptiveExecutor(CitusScanState *scanState) targetPoolSize = 1; } + bool excludeFromXact = false; + TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( - distributedPlan->modLevel, taskList, - hasDependentJobs); + distributedPlan->modLevel, taskList, excludeFromXact); bool localExecutionSupported = true; DistributedExecution *execution = CreateDistributedExecution( @@ -873,11 +877,6 @@ AdaptiveExecutor(CitusScanState *scanState) FinishDistributedExecution(execution); - if (hasDependentJobs) - { - DoRepartitionCleanup(jobIdList); - } - if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT) { SortTupleStore(scanState); @@ -963,6 +962,26 @@ ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, } +/* + * ExecuteTaskList is a proxy to ExecuteTaskListExtended + * with defaults for some of the arguments. + */ +uint64 +ExecuteTaskList(RowModifyLevel modLevel, List *taskList) +{ + bool localExecutionSupported = true; + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, taskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported + ); + + bool excludeFromXact = false; + executionParams->xactProperties = DecideTransactionPropertiesForTaskList( + modLevel, taskList, excludeFromXact); + + return ExecuteTaskListExtended(executionParams); +} + + /* * ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended * with defaults for some of the arguments. @@ -2639,12 +2658,6 @@ RunDistributedExecution(DistributedExecution *execution) */ UnclaimAllSessionConnections(execution->sessionList); - /* do repartition cleanup if this is a repartition query*/ - if (list_length(execution->jobIdList) > 0) - { - DoRepartitionCleanup(execution->jobIdList); - } - if (execution->waitEventSet != NULL) { FreeWaitEventSet(execution->waitEventSet); diff --git a/src/backend/distributed/executor/directed_acyclic_graph_execution.c b/src/backend/distributed/executor/directed_acyclic_graph_execution.c index 2245db420..73ae0b591 100644 --- a/src/backend/distributed/executor/directed_acyclic_graph_execution.c +++ b/src/backend/distributed/executor/directed_acyclic_graph_execution.c @@ -39,6 +39,7 @@ static HTAB * CreateTaskHashTable(void); static bool IsAllDependencyCompleted(Task *task, HTAB *completedTasks); static void AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks); static List * FindExecutableTasks(List *allTasks, HTAB *completedTasks); +static List * RemoveMergeTasks(List *taskList); static int TaskHashCompare(const void *key1, const void *key2, Size keysize); static uint32 TaskHash(const void *key, Size keysize); static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks); @@ -64,8 +65,13 @@ ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, List *jobIds) { break; } - ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks, - MaxAdaptiveExecutorPoolSize, jobIds); + + /* merge tasks do not need to be executed */ + List *executableTasks = RemoveMergeTasks(curTasks); + if (list_length(executableTasks) > 0) + { + ExecuteTaskList(ROW_MODIFY_NONE, executableTasks); + } AddCompletedTasks(curTasks, completedTasks); curTasks = NIL; @@ -97,6 +103,29 @@ FindExecutableTasks(List *allTasks, HTAB *completedTasks) } +/* + * RemoveMergeTasks returns a copy of taskList that excludes all the + * merge tasks. We do this because merge tasks are currently only a + * logical concept that does not need to be executed. + */ +static List * +RemoveMergeTasks(List *taskList) +{ + List *prunedTaskList = NIL; + Task *task = NULL; + + foreach_ptr(task, taskList) + { + if (task->taskType != MERGE_TASK) + { + prunedTaskList = lappend(prunedTaskList, task); + } + } + + return prunedTaskList; +} + + /* * AddCompletedTasks adds the givens tasks to completedTasks HTAB. */ diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 8a29e633d..f7d62e157 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -59,28 +59,6 @@ typedef struct PartitioningTupleDest } PartitioningTupleDest; -/* - * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer. - * It is a separate struct to use it as a key in a hash table. - */ -typedef struct NodePair -{ - uint32 sourceNodeId; - uint32 targetNodeId; -} NodePair; - - -/* - * NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from - * the source node to the destination node in the NodePair. - */ -typedef struct NodeToNodeFragmentsTransfer -{ - NodePair nodes; - List *fragmentList; -} NodeToNodeFragmentsTransfer; - - /* forward declarations of local functions */ static List * WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, @@ -98,9 +76,6 @@ static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, queryNumber); static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId); -static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, - Oid intervalTypeId, ArrayType **minValueArray, - ArrayType **maxValueArray); static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId); static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple, TupleDesc tupleDesc, @@ -115,8 +90,6 @@ static List ** ColocateFragmentsWithRelation(List *fragmentList, static List * ColocationTransfers(List *fragmentList, CitusTableCacheEntry *targetRelation); static List * FragmentTransferTaskList(List *fragmentListTransfers); -static char * QueryStringForFragmentsTransfer( - NodeToNodeFragmentsTransfer *fragmentsTransfer); static void ExecuteFetchTaskList(List *fetchTaskList); @@ -360,7 +333,7 @@ SourceShardPrefix(const char *resultPrefix, uint64 shardId) * ShardMinMaxValueArrays returns min values and max values of given shard * intervals. Returned arrays are text arrays. */ -static void +void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, Oid intervalTypeOutFunc, ArrayType **minValueArray, ArrayType **maxValueArray) @@ -632,7 +605,7 @@ FragmentTransferTaskList(List *fragmentListTransfers) * result fragments from source node to target node. See the structure of * NodeToNodeFragmentsTransfer for details of how these are decided. */ -static char * +char * QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer) { StringInfo queryString = makeStringInfo(); @@ -667,7 +640,7 @@ QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer) quote_literal_cstr(sourceNode->workerName), sourceNode->workerPort); - ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName, + ereport(DEBUG4, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName, sourceNode->workerPort, queryString->data))); return queryString->data; diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 3442e23a3..20c95fe06 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -17,6 +17,7 @@ #include "pgstat.h" #include "catalog/pg_enum.h" +#include "catalog/pg_type.h" #include "commands/copy.h" #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" @@ -916,6 +917,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) StringInfo beginAndSetXactId = BeginAndSetDistributedTransactionIdCommand(); ExecuteCriticalRemoteCommand(connection, beginAndSetXactId->data); + CreateIntermediateResultsDirectory(); + for (resultIndex = 0; resultIndex < resultCount; resultIndex++) { char *resultId = TextDatumGetCString(resultIdArray[resultIndex]); @@ -938,6 +941,19 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId) { + char *localPath = QueryResultFileName(resultId); + + struct stat fileStat; + int statOK = stat(localPath, &fileStat); + if (statOK == 0) + { + /* + * File exists, most likely because we are trying to fetch a + * a file from a node to itself. Skip doing work. + */ + return fileStat.st_size; + } + uint64 totalBytesWritten = 0; StringInfo copyCommand = makeStringInfo(); @@ -948,8 +964,6 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId) int socket = PQsocket(pgConn); bool raiseErrors = true; - CreateIntermediateResultsDirectory(); - appendStringInfo(copyCommand, "COPY \"%s\" TO STDOUT WITH (format result)", resultId); @@ -966,7 +980,6 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId) PQclear(result); - char *localPath = QueryResultFileName(resultId); File fileDesc = FileOpenForTransmit(localPath, fileFlags, fileMode); FileCompat fileCompat = FileCompatFromFileStart(fileDesc); diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 26bf12ba0..7c0426c0a 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -915,6 +915,25 @@ TaskAccessesLocalNode(Task *task) } +/* + * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have + * any visibility problems because of local execution. + */ +void +EnsureCompatibleLocalExecutionState(List *taskList) +{ + /* + * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily + * iterating the task list in AnyTaskAccessesLocalNode. + */ + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED && + AnyTaskAccessesLocalNode(taskList)) + { + ErrorIfTransactionAccessedPlacementsLocally(); + } +} + + /* * ErrorIfTransactionAccessedPlacementsLocally errors out if a local query * on any shard has already been executed in the same transaction. diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 8feb31a95..129a7d130 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -86,6 +86,9 @@ typedef struct PartitionedResultDestReceiver /* keeping track of which partitionDestReceivers have been started */ Bitmapset *startedDestReceivers; + + /* whether NULL partition column values are allowed */ + bool allowNullPartitionColumnValues; } PartitionedResultDestReceiver; static Portal StartPortalForQueryExecution(const char *queryString); @@ -99,7 +102,8 @@ static DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnInd shardSearchInfo, DestReceiver ** partitionedDestReceivers, - bool lazyStartup); + bool lazyStartup, + bool allowNullPartitionValues); static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot, @@ -148,6 +152,8 @@ worker_partition_query_result(PG_FUNCTION_ARGS) int32 maxValuesCount = ArrayObjectCount(maxValuesArray); bool binaryCopy = PG_GETARG_BOOL(6); + bool allowNullPartitionColumnValues = PG_GETARG_BOOL(7); + bool generateEmptyResults = PG_GETARG_BOOL(8); if (!IsMultiStatementTransaction()) { @@ -226,13 +232,21 @@ worker_partition_query_result(PG_FUNCTION_ARGS) dests[partitionIndex] = partitionDest; } - const bool lazyStartup = true; + /* + * If we are asked to generated empty results, use non-lazy startup. + * + * The rStartup of the FileDestReceiver will be called for all partitions + * and generate empty files, which may still have binary header/footer. + */ + const bool lazyStartup = !generateEmptyResults; + DestReceiver *dest = CreatePartitionedResultDestReceiver( partitionColumnIndex, partitionCount, shardSearchInfo, dests, - lazyStartup); + lazyStartup, + allowNullPartitionColumnValues); /* execute the query */ PortalRun(portal, FETCH_ALL, false, true, dest, dest, NULL); @@ -390,7 +404,8 @@ CreatePartitionedResultDestReceiver(int partitionColumnIndex, int partitionCount, CitusTableCacheEntry *shardSearchInfo, DestReceiver **partitionedDestReceivers, - bool lazyStartup) + bool lazyStartup, + bool allowNullPartitionColumnValues) { PartitionedResultDestReceiver *resultDest = palloc0(sizeof(PartitionedResultDestReceiver)); @@ -409,6 +424,7 @@ CreatePartitionedResultDestReceiver(int partitionColumnIndex, resultDest->partitionDestReceivers = partitionedDestReceivers; resultDest->startedDestReceivers = NULL; resultDest->lazyStartup = lazyStartup; + resultDest->allowNullPartitionColumnValues = allowNullPartitionColumnValues; return (DestReceiver *) resultDest; } @@ -458,23 +474,40 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; + int partitionIndex; + if (columnNulls[self->partitionColumnIndex]) { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("the partition column value cannot be NULL"))); + if (self->allowNullPartitionColumnValues) + { + /* + * NULL values go into the first partition for both hash- and range- + * partitioning, since that is the only way to guarantee that there is + * always a partition for NULL and that it is always the same partition. + */ + partitionIndex = 0; + } + else + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("the partition column value cannot be NULL"))); + } } - - Datum partitionColumnValue = columnValues[self->partitionColumnIndex]; - ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, - self->shardSearchInfo); - if (shardInterval == NULL) + else { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find shard for partition column " - "value"))); + Datum partitionColumnValue = columnValues[self->partitionColumnIndex]; + ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, + self->shardSearchInfo); + if (shardInterval == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find shard for partition column " + "value"))); + } + + partitionIndex = shardInterval->shardIndex; } - int partitionIndex = shardInterval->shardIndex; DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex]; /* check if this partitionDestReceiver has been started before, start if not */ diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index dfe8efd9e..29d994e59 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -44,12 +44,8 @@ #include "distributed/worker_transaction.h" -static List * CreateTemporarySchemasForMergeTasks(Job *topLevelJob); static List * ExtractJobsInJobTree(Job *job); static void TraverseJobTree(Job *curJob, List **jobs); -static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner); -static char * GenerateJobCommands(List *jobIds, char *templateCommand); -static char * GenerateDeleteJobsCommand(List *jobIds); /* @@ -60,13 +56,8 @@ static char * GenerateDeleteJobsCommand(List *jobIds); List * ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) { - EnsureNoModificationsHaveBeenDone(); - List *allTasks = CreateTaskListForJobTree(topLevelTasks); - - EnsureCompatibleLocalExecutionState(allTasks); - - List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); + List *jobIds = ExtractJobsInJobTree(topLevelJob); ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds); @@ -74,40 +65,6 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) } -/* - * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have - * any visibility problems because of local execution. - */ -void -EnsureCompatibleLocalExecutionState(List *taskList) -{ - /* - * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily - * iterating the task list in AnyTaskAccessesLocalNode. - */ - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED && - AnyTaskAccessesLocalNode(taskList)) - { - ErrorIfTransactionAccessedPlacementsLocally(); - } -} - - -/* - * CreateTemporarySchemasForMergeTasks creates the necessary schemas that will be used - * later in each worker. Single transaction is used to create the schemas. - */ -static List * -CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) -{ - List *jobIds = ExtractJobsInJobTree(topLeveLJob); - char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(ALL_SHARD_NODES, createSchemasCommand, - CitusExtensionOwnerName()); - return jobIds; -} - - /* * ExtractJobsInJobTree returns all job ids in the job tree * where the given job is root. @@ -139,67 +96,3 @@ TraverseJobTree(Job *curJob, List **jobIds) TraverseJobTree(childJob, jobIds); } } - - -/* - * GenerateCreateSchemasCommand returns concatanated create schema commands. - */ -static char * -GenerateCreateSchemasCommand(List *jobIds, char *ownerName) -{ - StringInfo createSchemaCommand = makeStringInfo(); - - uint64 *jobIdPointer = NULL; - foreach_ptr(jobIdPointer, jobIds) - { - uint64 jobId = *jobIdPointer; - appendStringInfo(createSchemaCommand, WORKER_CREATE_SCHEMA_QUERY, - jobId, quote_literal_cstr(ownerName)); - } - return createSchemaCommand->data; -} - - -/* - * GenerateJobCommands returns concatenated commands with the given template - * command for each job id from the given job ids. The returned command is - * exactly list_length(jobIds) subcommands. - * E.g create_schema(jobId1); create_schema(jobId2); ... - * This way we can send the command in just one latency to a worker. - */ -static char * -GenerateJobCommands(List *jobIds, char *templateCommand) -{ - StringInfo createSchemaCommand = makeStringInfo(); - - uint64 *jobIdPointer = NULL; - foreach_ptr(jobIdPointer, jobIds) - { - uint64 jobId = *jobIdPointer; - appendStringInfo(createSchemaCommand, templateCommand, jobId); - } - return createSchemaCommand->data; -} - - -/* - * DoRepartitionCleanup removes the temporary job directories and schemas that are - * used for repartition queries for the given job ids. - */ -void -DoRepartitionCleanup(List *jobIds) -{ - SendCommandToWorkersOptionalInParallel(ALL_SHARD_NODES, GenerateDeleteJobsCommand( - jobIds), - CitusExtensionOwnerName()); -} - - -/* - * GenerateDeleteJobsCommand returns concatanated remove job dir commands. - */ -static char * -GenerateDeleteJobsCommand(List *jobIds) -{ - return GenerateJobCommands(jobIds, WORKER_REPARTITION_CLEANUP_QUERY); -} diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 28b750e8f..9f578eac9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -40,6 +40,7 @@ #include "distributed/colocation_utils.h" #include "distributed/deparse_shard_query.h" #include "distributed/coordinator_protocol.h" +#include "distributed/intermediate_results.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_join_order.h" @@ -52,15 +53,16 @@ #include "distributed/pg_dist_shard.h" #include "distributed/query_pushdown_planning.h" #include "distributed/query_utils.h" +#include "distributed/recursive_planning.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" #include "distributed/string_utils.h" - #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/print.h" #include "optimizer/clauses.h" #include "nodes/pathnodes.h" #include "optimizer/optimizer.h" @@ -157,8 +159,6 @@ static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Oid baseRelationId, BoundaryNodeJobType boundaryNodeJobType); static uint32 HashPartitionCount(void); -static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, - uint32 shardIntervalCount); /* Local functions forward declarations for task list creation and helper functions */ static Job * BuildJobTreeTaskList(Job *jobTree, @@ -195,11 +195,11 @@ static bool JoinPrunable(RangeTableFragment *leftFragment, static ShardInterval * FragmentInterval(RangeTableFragment *fragment); static StringInfo FragmentIntervalString(ShardInterval *fragmentInterval); static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragmentList); -static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId); static List * BuildRelationShardList(List *rangeTableList, List *fragmentList); static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList); static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment); +static List * FetchTaskResultNameList(List *mapOutputFetchTaskList); static uint64 AnchorShardId(List *fragmentList, uint32 anchorRangeTableId); static List * PruneSqlTaskDependencies(List *sqlTaskList); static List * AssignTaskList(List *sqlTaskList); @@ -218,11 +218,13 @@ static void AssignDataFetchDependencies(List *taskList); static uint32 TaskListHighestTaskId(List *taskList); static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList); static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, - uint32 partitionColumnIndex); + uint32 partitionColumnIndex, bool useBinaryFormat); +static char * PartitionResultNamePrefix(uint64 jobId, int32 taskId); +static char * PartitionResultName(uint64 jobId, uint32 taskId, uint32 partitionId); +static ShardInterval ** RangeIntervalArrayWithNullBucket(ShardInterval **intervalArray, + int intervalCount); static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex); -static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId); -static StringInfo ColumnTypeArrayString(List *targetEntryList); static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr); static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr); @@ -853,10 +855,14 @@ TargetEntryList(List *expressionList) foreach(expressionCell, expressionList) { Expr *expression = (Expr *) lfirst(expressionCell); + int columnNumber = list_length(targetEntryList) + 1; + + StringInfo columnName = makeStringInfo(); + appendStringInfo(columnName, "column%d", columnNumber); + + TargetEntry *targetEntry = makeTargetEntry(expression, columnNumber, + columnName->data, false); - TargetEntry *targetEntry = makeTargetEntry(expression, - list_length(targetEntryList) + 1, - NULL, false); targetEntryList = lappend(targetEntryList, targetEntry); } @@ -2043,45 +2049,6 @@ HashPartitionCount(void) } -/* - * SplitPointObject walks over shard intervals in the given array, extracts each - * shard interval's minimum value, sorts and inserts these minimum values into a - * new array. This sorted array is then used by the MapMerge job. - */ -static ArrayType * -SplitPointObject(ShardInterval **shardIntervalArray, uint32 shardIntervalCount) -{ - Oid typeId = InvalidOid; - bool typeByValue = false; - char typeAlignment = 0; - int16 typeLength = 0; - - /* allocate an array for shard min values */ - uint32 minDatumCount = shardIntervalCount; - Datum *minDatumArray = palloc0(minDatumCount * sizeof(Datum)); - - for (uint32 intervalIndex = 0; intervalIndex < shardIntervalCount; intervalIndex++) - { - ShardInterval *shardInterval = shardIntervalArray[intervalIndex]; - minDatumArray[intervalIndex] = shardInterval->minValue; - Assert(shardInterval->minValueExists); - - /* resolve the datum type on the first pass */ - if (intervalIndex == 0) - { - typeId = shardInterval->valueTypeId; - } - } - - /* construct the split point object from the sorted array */ - get_typlenbyvalalign(typeId, &typeLength, &typeByValue, &typeAlignment); - ArrayType *splitPointObject = construct_array(minDatumArray, minDatumCount, typeId, - typeLength, typeByValue, typeAlignment); - - return splitPointObject; -} - - /* ------------------------------------------------------------ * Functions that relate to building and assigning tasks follow * ------------------------------------------------------------ @@ -4097,34 +4064,6 @@ DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragmentList) } -/* Helper function to return a datum array's external string representation. */ -static StringInfo -DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId) -{ - int16 typeLength = 0; - bool typeByValue = false; - char typeAlignment = 0; - - /* construct the array object from the given array */ - get_typlenbyvalalign(datumTypeId, &typeLength, &typeByValue, &typeAlignment); - ArrayType *arrayObject = construct_array(datumArray, datumCount, datumTypeId, - typeLength, typeByValue, typeAlignment); - Datum arrayObjectDatum = PointerGetDatum(arrayObject); - - /* convert the array object to its string representation */ - FmgrInfo *arrayOutFunction = (FmgrInfo *) palloc0(sizeof(FmgrInfo)); - fmgr_info(F_ARRAY_OUT, arrayOutFunction); - - Datum arrayStringDatum = FunctionCall1(arrayOutFunction, arrayObjectDatum); - char *arrayString = DatumGetCString(arrayStringDatum); - - StringInfo arrayStringInfo = makeStringInfo(); - appendStringInfo(arrayStringInfo, "%s", arrayString); - - return arrayStringInfo; -} - - /* * CreateBasicTask creates a task, initializes fields that are common to each task, * and returns the created task. @@ -4234,19 +4173,26 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment) else if (fragmentType == CITUS_RTE_REMOTE_QUERY) { Task *mergeTask = (Task *) fragment->fragmentReference; - uint64 jobId = mergeTask->jobId; - uint32 taskId = mergeTask->taskId; + List *mapOutputFetchTaskList = mergeTask->dependentTaskList; + List *resultNameList = FetchTaskResultNameList(mapOutputFetchTaskList); + List *mapJobTargetList = mergeTask->mapJobTargetList; - StringInfo jobSchemaName = JobSchemaName(jobId); - StringInfo taskTableName = TaskTableName(taskId); + /* determine whether all types have binary input/output functions */ + bool useBinaryFormat = CanUseBinaryCopyFormatForTargetList(mapJobTargetList); - StringInfo aliasNameString = makeStringInfo(); - appendStringInfo(aliasNameString, "%s.%s", - jobSchemaName->data, taskTableName->data); + /* generate the query on the intermediate result */ + Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(mapJobTargetList, + NIL, + resultNameList, + useBinaryFormat); - aliasName = aliasNameString->data; - fragmentName = taskTableName->data; - schemaName = jobSchemaName->data; + /* we only really care about the function RTE */ + RangeTblEntry *readIntermediateResultsRTE = linitial(fragmentSetQuery->rtable); + + /* crudely override the fragment RTE */ + *rangeTableEntry = *readIntermediateResultsRTE; + + return rangeTableEntry->alias; } /* @@ -4267,6 +4213,30 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment) } +/* + * FetchTaskResultNameList builds a list of result names that reflect + * the output of map-fetch tasks. + */ +static List * +FetchTaskResultNameList(List *mapOutputFetchTaskList) +{ + List *resultNameList = NIL; + Task *mapOutputFetchTask = NULL; + + foreach_ptr(mapOutputFetchTask, mapOutputFetchTaskList) + { + Task *mapTask = linitial(mapOutputFetchTask->dependentTaskList); + int partitionId = mapOutputFetchTask->partitionId; + char *resultName = + PartitionResultName(mapTask->jobId, mapTask->taskId, partitionId); + + resultNameList = lappend(resultNameList, resultName); + } + + return resultNameList; +} + + /* * AnchorShardId walks over each fragment in the given fragment list, finds the * fragment that corresponds to the given anchor range tableId, and returns this @@ -4377,17 +4347,28 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) filterQuery->targetList); } + /* determine whether all types have binary input/output functions */ + bool useBinaryFormat = CanUseBinaryCopyFormatForTargetList(filterQuery->targetList); + foreach(filterTaskCell, filterTaskList) { Task *filterTask = (Task *) lfirst(filterTaskCell); StringInfo mapQueryString = CreateMapQueryString(mapMergeJob, filterTask, - partitionColumnResNo); + partitionColumnResNo, + useBinaryFormat); /* convert filter query task into map task */ Task *mapTask = filterTask; SetTaskQueryString(mapTask, mapQueryString->data); mapTask->taskType = MAP_TASK; + /* + * We do not support fail-over in case of map tasks, since we would also + * have to fail over the corresponding merge tasks. We therefore truncate + * the list down to the first element. + */ + mapTask->taskPlacementList = list_truncate(mapTask->taskPlacementList, 1); + mapTaskList = lappend(mapTaskList, mapTask); } @@ -4428,21 +4409,19 @@ PartitionColumnIndex(Var *targetVar, List *targetList) */ static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, - uint32 partitionColumnIndex) + uint32 partitionColumnIndex, bool useBinaryFormat) { uint64 jobId = filterTask->jobId; uint32 taskId = filterTask->taskId; + char *resultNamePrefix = PartitionResultNamePrefix(jobId, taskId); /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); char *filterQueryString = TaskQueryString(filterTask); - char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; Var *partitionColumn = mapMergeJob->partitionColumn; Oid partitionColumnType = partitionColumn->vartype; - char *partitionColumnTypeFullName = format_type_be_qualified(partitionColumnType); - int32 partitionColumnTypeMod = partitionColumn->vartypmod; ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray; uint32 intervalCount = mapMergeJob->partitionCount; @@ -4450,38 +4429,101 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, if (partitionType == DUAL_HASH_PARTITION_TYPE) { partitionColumnType = INT4OID; - partitionColumnTypeMod = get_typmodin(INT4OID); intervalArray = GenerateSyntheticShardIntervalArray(intervalCount); } else if (partitionType == SINGLE_HASH_PARTITION_TYPE) { partitionColumnType = INT4OID; - partitionColumnTypeMod = get_typmodin(INT4OID); } - - ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); - StringInfo splitPointString = ArrayObjectToString(splitPointObject, - partitionColumnType, - partitionColumnTypeMod); - - char *partitionCommand = NULL; - if (partitionType == RANGE_PARTITION_TYPE) + else if (partitionType == RANGE_PARTITION_TYPE) { - partitionCommand = RANGE_PARTITION_COMMAND; - } - else - { - partitionCommand = HASH_PARTITION_COMMAND; + /* add a partition for NULL values at index 0 */ + intervalArray = RangeIntervalArrayWithNullBucket(intervalArray, intervalCount); + intervalCount++; } - char *partitionColumnIndextText = ConvertIntToString(partitionColumnIndex); - appendStringInfo(mapQueryString, partitionCommand, jobId, taskId, - filterQueryEscapedText, partitionColumnIndextText, - partitionColumnTypeFullName, splitPointString->data); + Oid intervalTypeOutFunc = InvalidOid; + bool intervalTypeVarlena = false; + ArrayType *minValueArray = NULL; + ArrayType *maxValueArray = NULL; + + getTypeOutputInfo(partitionColumnType, &intervalTypeOutFunc, &intervalTypeVarlena); + + ShardMinMaxValueArrays(intervalArray, intervalCount, intervalTypeOutFunc, + &minValueArray, &maxValueArray); + + StringInfo minValuesString = ArrayObjectToString(minValueArray, TEXTOID, + InvalidOid); + StringInfo maxValuesString = ArrayObjectToString(maxValueArray, TEXTOID, + InvalidOid); + + char *partitionMethodString = partitionType == RANGE_PARTITION_TYPE ? + "range" : "hash"; + + /* + * Non-partition columns can easily contain NULL values, so we allow NULL + * values in the column by which we re-partition. They will end up in the + * first partition. + */ + bool allowNullPartitionColumnValue = true; + + /* + * We currently generate empty results for each partition and fetch all of them. + */ + bool generateEmptyResults = true; + + appendStringInfo(mapQueryString, + "SELECT partition_index" + ", %s || '_' || partition_index::text " + ", rows_written " + "FROM pg_catalog.worker_partition_query_result" + "(%s,%s,%d,%s,%s,%s,%s,%s,%s) WHERE rows_written > 0", + quote_literal_cstr(resultNamePrefix), + quote_literal_cstr(resultNamePrefix), + quote_literal_cstr(filterQueryString), + partitionColumnIndex - 1, + quote_literal_cstr(partitionMethodString), + minValuesString->data, + maxValuesString->data, + useBinaryFormat ? "true" : "false", + allowNullPartitionColumnValue ? "true" : "false", + generateEmptyResults ? "true" : "false"); + return mapQueryString; } +/* + * PartitionResultNamePrefix returns the prefix we use for worker_partition_query_result + * results. Each result will have a _ suffix. + */ +static char * +PartitionResultNamePrefix(uint64 jobId, int32 taskId) +{ + StringInfo resultNamePrefix = makeStringInfo(); + + appendStringInfo(resultNamePrefix, "repartition_" UINT64_FORMAT "_%u", jobId, taskId); + + return resultNamePrefix->data; +} + + +/* + * PartitionResultName returns the name of a worker_partition_query_result result for + * a specific partition. + */ +static char * +PartitionResultName(uint64 jobId, uint32 taskId, uint32 partitionId) +{ + StringInfo resultName = makeStringInfo(); + char *resultNamePrefix = PartitionResultNamePrefix(jobId, taskId); + + appendStringInfo(resultName, "%s_%d", resultNamePrefix, partitionId); + + return resultName->data; +} + + /* * GenerateSyntheticShardIntervalArray returns a shard interval pointer array * which has a uniform hash distribution for the given input partitionCount. @@ -4504,6 +4546,12 @@ GenerateSyntheticShardIntervalArray(int partitionCount) int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); + /* extend the last range to cover the full range of integers */ + if (shardIndex == (partitionCount - 1)) + { + shardMaxHashToken = PG_INT32_MAX; + } + shardInterval->relationId = InvalidOid; shardInterval->minValueExists = true; shardInterval->minValue = Int32GetDatum(shardMinHashToken); @@ -4521,6 +4569,34 @@ GenerateSyntheticShardIntervalArray(int partitionCount) } +/* + * RangeIntervalArrayWithNullBucket prepends an additional bucket for NULL values + * to intervalArray and returns the result. + * + * When we support NULL values in (range-partitioned) shards, we will need to revise + * this logic, since there may already be an interval for NULL values. + */ +static ShardInterval ** +RangeIntervalArrayWithNullBucket(ShardInterval **intervalArray, int intervalCount) +{ + int fullIntervalCount = intervalCount + 1; + ShardInterval **fullIntervalArray = + palloc0(fullIntervalCount * sizeof(ShardInterval *)); + + fullIntervalArray[0] = CitusMakeNode(ShardInterval); + fullIntervalArray[0]->minValueExists = true; + fullIntervalArray[0]->maxValueExists = true; + fullIntervalArray[0]->valueTypeId = intervalArray[0]->valueTypeId; + + for (int intervalIndex = 1; intervalIndex < fullIntervalCount; intervalIndex++) + { + fullIntervalArray[intervalIndex] = intervalArray[intervalIndex - 1]; + } + + return fullIntervalArray; +} + + /* * Determine RowModifyLevel required for given query */ @@ -4598,7 +4674,7 @@ ArrayObjectToString(ArrayType *arrayObject, Oid columnType, int32 columnTypeMod) char *arrayOutputEscapedText = quote_literal_cstr(arrayOutputText); /* add an explicit cast to array's string representation */ - char *arrayOutTypeName = format_type_with_typemod(arrayOutType, columnTypeMod); + char *arrayOutTypeName = format_type_be(arrayOutType); StringInfo arrayString = makeStringInfo(); appendStringInfo(arrayString, "%s::%s", @@ -4660,17 +4736,9 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) Query *reduceQuery = mapMergeJob->reduceQuery; if (reduceQuery == NULL) { - uint32 columnCount = (uint32) list_length(targetEntryList); - StringInfo columnNames = ColumnNameArrayString(columnCount, jobId); - StringInfo columnTypes = ColumnTypeArrayString(targetEntryList); - - StringInfo mergeQueryString = makeStringInfo(); - appendStringInfo(mergeQueryString, MERGE_FILES_INTO_TABLE_COMMAND, - jobId, taskIdIndex, columnNames->data, columnTypes->data); - - /* create merge task */ + /* create logical merge task (not executed, but useful for bookkeeping) */ mergeTask = CreateBasicTask(jobId, mergeTaskId, MERGE_TASK, - mergeQueryString->data); + ""); } mergeTask->partitionId = partitionId; taskIdIndex++; @@ -4682,26 +4750,35 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) /* find the node name/port for map task's execution */ List *mapTaskPlacementList = mapTask->taskPlacementList; - ShardPlacement *mapTaskPlacement = linitial(mapTaskPlacementList); - char *mapTaskNodeName = mapTaskPlacement->nodeName; - uint32 mapTaskNodePort = mapTaskPlacement->nodePort; + + char *partitionResultName = + PartitionResultName(jobId, mapTask->taskId, partitionId); + + /* we currently only fetch a single fragment at a time */ + DistributedResultFragment singleFragmentTransfer; + singleFragmentTransfer.resultId = partitionResultName; + singleFragmentTransfer.nodeId = mapTaskPlacement->nodeId; + singleFragmentTransfer.rowCount = 0; + singleFragmentTransfer.targetShardId = INVALID_SHARD_ID; + singleFragmentTransfer.targetShardIndex = partitionId; + + NodeToNodeFragmentsTransfer fragmentsTransfer; + fragmentsTransfer.nodes.sourceNodeId = mapTaskPlacement->nodeId; /* - * We will use the first node even if replication factor is greater than 1 - * When replication factor is greater than 1 and there - * is a connection problem to the node that has done the map task, we will get - * an error in fetch task execution. + * Target node is not yet decided, and not necessary for + * QueryStringForFragmentsTransfer. */ - StringInfo mapFetchQueryString = makeStringInfo(); - appendStringInfo(mapFetchQueryString, MAP_OUTPUT_FETCH_COMMAND, - mapTask->jobId, mapTask->taskId, partitionId, - mergeTaskId, /* fetch results to merge task */ - mapTaskNodeName, mapTaskNodePort); + fragmentsTransfer.nodes.targetNodeId = -1; + + fragmentsTransfer.fragmentList = list_make1(&singleFragmentTransfer); + + char *fetchQueryString = QueryStringForFragmentsTransfer(&fragmentsTransfer); Task *mapOutputFetchTask = CreateBasicTask(jobId, taskIdIndex, MAP_OUTPUT_FETCH_TASK, - mapFetchQueryString->data); + fetchQueryString); mapOutputFetchTask->partitionId = partitionId; mapOutputFetchTask->upstreamTaskId = mergeTaskId; mapOutputFetchTask->dependentTaskList = list_make1(mapTask); @@ -4712,6 +4789,7 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) /* merge task depends on completion of fetch tasks */ mergeTask->dependentTaskList = mapOutputFetchTaskList; + mergeTask->mapJobTargetList = targetEntryList; /* if single repartitioned, each merge task represents an interval */ if (mapMergeJob->partitionType == RANGE_PARTITION_TYPE) @@ -4738,71 +4816,6 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) } -/* - * ColumnNameArrayString creates a list of column names for a merged table, and - * outputs this list of column names in their (array) string representation. - */ -static StringInfo -ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId) -{ - Datum *columnNameArray = palloc0(columnCount * sizeof(Datum)); - uint32 columnNameIndex = 0; - - /* build list of intermediate column names, generated by given jobId */ - List *columnNameList = DerivedColumnNameList(columnCount, generatingJobId); - - ListCell *columnNameCell = NULL; - foreach(columnNameCell, columnNameList) - { - Value *columnNameValue = (Value *) lfirst(columnNameCell); - char *columnNameString = strVal(columnNameValue); - Datum columnName = CStringGetDatum(columnNameString); - - columnNameArray[columnNameIndex] = columnName; - columnNameIndex++; - } - - StringInfo columnNameArrayString = DatumArrayString(columnNameArray, columnCount, - CSTRINGOID); - - return columnNameArrayString; -} - - -/* - * ColumnTypeArrayString resolves a list of column types for a merged table, and - * outputs this list of column types in their (array) string representation. - */ -static StringInfo -ColumnTypeArrayString(List *targetEntryList) -{ - ListCell *targetEntryCell = NULL; - - uint32 columnCount = (uint32) list_length(targetEntryList); - Datum *columnTypeArray = palloc0(columnCount * sizeof(Datum)); - uint32 columnTypeIndex = 0; - - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - Node *columnExpression = (Node *) targetEntry->expr; - Oid columnTypeId = exprType(columnExpression); - int32 columnTypeMod = exprTypmod(columnExpression); - - char *columnTypeName = format_type_with_typemod(columnTypeId, columnTypeMod); - Datum columnType = CStringGetDatum(columnTypeName); - - columnTypeArray[columnTypeIndex] = columnType; - columnTypeIndex++; - } - - StringInfo columnTypeArrayString = DatumArrayString(columnTypeArray, columnCount, - CSTRINGOID); - - return columnTypeArrayString; -} - - /* * AssignTaskList assigns locations to given tasks based on dependencies between * tasks and configured task assignment policies. The function also handles the @@ -5392,6 +5405,7 @@ ActiveShardPlacementLists(List *taskList) /* sort shard placements by their creation time */ activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); + shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index a3a6cb3c7..e6b98a843 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -603,7 +603,7 @@ RegisterCitusConfigVariables(void) false, #endif PGC_SIGHUP, - GUC_STANDARD, + GUC_NO_SHOW_ALL, NULL, NULL, NULL); DefineCustomBoolVariable( diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 849b28761..78156d634 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -42,8 +42,9 @@ GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; #include "udfs/pg_cancel_backend/11.0-1.sql" #include "udfs/pg_terminate_backend/11.0-1.sql" +#include "udfs/worker_partition_query_result/11.0-1.sql" -DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); +DROP FUNCTION pg_catalog.master_apply_delete_command(text); DROP FUNCTION pg_catalog.master_get_table_metadata(text); DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index ba13b134a..5da5752ca 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -1,6 +1,9 @@ -- citus--11.0-1--10.2-4 DROP FUNCTION pg_catalog.create_distributed_function(regprocedure, text, text, bool); + +#include "../udfs/worker_partition_query_result/9.2-1.sql" + CREATE FUNCTION pg_catalog.master_apply_delete_command(text) RETURNS integer LANGUAGE C STRICT @@ -21,6 +24,7 @@ CREATE FUNCTION pg_catalog.master_get_table_metadata( AS 'MODULE_PATHNAME', $$master_get_table_metadata$$; COMMENT ON FUNCTION master_get_table_metadata(relation_name text) IS 'fetch metadata values for the table'; + ALTER TABLE pg_catalog.pg_dist_partition DROP COLUMN autoconverted; CREATE FUNCTION master_append_table_to_shard(bigint, text, text, integer) diff --git a/src/backend/distributed/sql/udfs/worker_partition_query_result/11.0-1.sql b/src/backend/distributed/sql/udfs/worker_partition_query_result/11.0-1.sql new file mode 100644 index 000000000..d5c6fb8de --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_partition_query_result/11.0-1.sql @@ -0,0 +1,20 @@ +DROP FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result( + result_prefix text, + query text, + partition_column_index int, + partition_method citus.distribution_type, + partition_min_values text[], + partition_max_values text[], + binary_copy boolean, + allow_null_partition_column boolean DEFAULT false, + generate_empty_results boolean DEFAULT false, + OUT partition_index int, + OUT rows_written bigint, + OUT bytes_written bigint) +RETURNS SETOF record +LANGUAGE C STRICT VOLATILE +AS 'MODULE_PATHNAME', $$worker_partition_query_result$$; +COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean, boolean, boolean) +IS 'execute a query and partitions its results in set of local result files'; diff --git a/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql b/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql index bda8384fb..d5c6fb8de 100644 --- a/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql @@ -1,3 +1,5 @@ +DROP FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean); + CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result( result_prefix text, query text, @@ -5,12 +7,14 @@ CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result( partition_method citus.distribution_type, partition_min_values text[], partition_max_values text[], - binaryCopy boolean, + binary_copy boolean, + allow_null_partition_column boolean DEFAULT false, + generate_empty_results boolean DEFAULT false, OUT partition_index int, OUT rows_written bigint, OUT bytes_written bigint) RETURNS SETOF record LANGUAGE C STRICT VOLATILE AS 'MODULE_PATHNAME', $$worker_partition_query_result$$; -COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean) +COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean, boolean, boolean) IS 'execute a query and partitions its results in set of local result files'; diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 0a3768177..0a4735ee7 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -15,6 +15,7 @@ extern bool EnableCostBasedConnectionEstablishment; extern bool PreventIncompleteConnectionEstablishment; extern bool ShouldRunTasksSequentially(List *taskList); +extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, bool localExecutionSupported); diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 6e5a7f640..e40eadba9 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -48,6 +48,28 @@ typedef struct DistributedResultFragment } DistributedResultFragment; +/* + * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer. + * It is a separate struct to use it as a key in a hash table. + */ +typedef struct NodePair +{ + uint32 sourceNodeId; + uint32 targetNodeId; +} NodePair; + + +/* + * NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from + * the source node to the destination node in the NodePair. + */ +typedef struct NodeToNodeFragmentsTransfer +{ + NodePair nodes; + List *fragmentList; +} NodeToNodeFragmentsTransfer; + + /* intermediate_results.c */ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, EState *executorState, @@ -72,5 +94,10 @@ extern List * PartitionTasklistResults(const char *resultIdPrefix, List *selectT int partitionColumnIndex, CitusTableCacheEntry *distributionScheme, bool binaryFormat); +extern char * QueryStringForFragmentsTransfer( + NodeToNodeFragmentsTransfer *fragmentsTransfer); +extern void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, + Oid intervalTypeId, ArrayType **minValueArray, + ArrayType **maxValueArray); #endif /* INTERMEDIATE_RESULTS_H */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index a47dccb17..d2b8cce9c 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -43,6 +43,7 @@ extern void ExecuteUtilityCommand(const char *utilityCommand); extern bool ShouldExecuteTasksLocally(List *taskList); extern bool AnyTaskAccessesLocalNode(List *taskList); extern bool TaskAccessesLocalNode(Task *task); +extern void EnsureCompatibleLocalExecutionState(List *taskList); extern void ErrorIfTransactionAccessedPlacementsLocally(void); extern void DisableLocalExecution(void); extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1a3630f81..14fdd7a0c 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -36,15 +36,6 @@ /* Definitions local to the physical planner */ #define NON_PRUNABLE_JOIN -1 #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber -#define MERGE_COLUMN_FORMAT "merge_column_%u" -#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ - (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" -#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ - (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" -#define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \ - (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" -#define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \ - (" UINT64_FORMAT ", %d, '%s', '%s')" extern int RepartitionJoinBucketCountPerNode; @@ -262,6 +253,10 @@ typedef struct Task uint32 upstreamTaskId; /* only applies to data fetch tasks */ ShardInterval *shardInterval; /* only applies to merge tasks */ bool assignmentConstrained; /* only applies to merge tasks */ + + /* for merge tasks, this is set to the target list of the map task */ + List *mapJobTargetList; + char replicationModel; /* only applies to modify tasks */ /* diff --git a/src/include/distributed/repartition_join_execution.h b/src/include/distributed/repartition_join_execution.h index 596dffc0b..7f2b648cf 100644 --- a/src/include/distributed/repartition_join_execution.h +++ b/src/include/distributed/repartition_join_execution.h @@ -13,8 +13,6 @@ #include "nodes/pg_list.h" extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob); -extern void EnsureCompatibleLocalExecutionState(List *taskList); -extern void DoRepartitionCleanup(List *jobIds); #endif /* REPARTITION_JOIN_EXECUTION_H */ diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index 52b0714c8..beccf8b26 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -227,8 +227,7 @@ class CitusSmallSharedPoolSizeConfig(CitusMXBaseClusterConfig): def __init__(self, arguments): super().__init__(arguments) self.new_settings = { - # can be uncommented after https://github.com/citusdata/citus/issues/5342 - # "citus.local_shared_pool_size": 5, + "citus.local_shared_pool_size": 5, "citus.max_shared_pool_size": 5, } diff --git a/src/test/regress/expected/adaptive_executor_repartition.out b/src/test/regress/expected/adaptive_executor_repartition.out index c5b583bef..a84677a35 100644 --- a/src/test/regress/expected/adaptive_executor_repartition.out +++ b/src/test/regress/expected/adaptive_executor_repartition.out @@ -52,9 +52,12 @@ SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT ROLLBACK; BEGIN; INSERT INTO ab values(1, 2); --- DDL happened before repartition query in a transaction block, so this should error. SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; -ERROR: cannot open new connections after the first modification command within a transaction + count +--------------------------------------------------------------------- + 14 +(1 row) + ROLLBACK; SET citus.enable_single_hash_repartition_joins TO ON; CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 5d5fa982c..485e7f11b 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -249,8 +249,30 @@ SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; (1 row) BEGIN; -SET citus.enable_repartition_joins TO ON; +SET citus.enable_unique_job_ids TO off; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +NOTICE: executing the command locally: SELECT partition_index, 'repartition_25_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_25_1','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503000 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_25_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_25_4','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503003 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_26_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_26_1','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503000 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_26_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_26_4','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503003 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_1_0']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_2_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_3_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_4_0']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_1_0']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_2_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_3_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_4_0']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_1_3']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_2_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_3_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_4_3']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_1_3']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_2_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_3_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_4_3']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_25_1_0,repartition_25_2_0,repartition_25_3_0,repartition_25_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_26_1_0,repartition_26_2_0,repartition_26_3_0,repartition_26_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_25_1_3,repartition_25_2_3,repartition_25_3_3,repartition_25_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_26_1_3,repartition_26_2_3,repartition_26_3_3,repartition_26_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true count --------------------------------------------------------------------- 100 @@ -268,9 +290,33 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar (1 row) SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT partition_index, 'repartition_29_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_29_1','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503000 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_29_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_29_4','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503003 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_30_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_30_1','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503000 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_30_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_30_4','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503003 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_1_2']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_2_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_3_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_4_2']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_1_2']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_2_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_3_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_4_2']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_1_5']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_2_5']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_3_5']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_4_5']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_1_5']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_2_5']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_3_5']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_4_5']::text[],'localhost',57636) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_29_1_2,repartition_29_2_2,repartition_29_3_2,repartition_29_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_30_1_2,repartition_30_2_2,repartition_30_3_2,repartition_30_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_29_1_5,repartition_29_2_5,repartition_29_3_5,repartition_29_4_5}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_30_1_5,repartition_30_2_5,repartition_30_3_5,repartition_30_4_5}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + ROLLBACK; CREATE TABLE ref (a int, b int); SELECT create_reference_table('ref'); diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index de2f88b6e..4c7a03954 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -157,25 +157,50 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + SET citus.enable_single_hash_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + SET citus.task_assignment_policy TO 'round-robin'; SET citus.enable_single_hash_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + SET citus.task_assignment_policy TO 'greedy'; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + SET citus.task_assignment_policy TO 'first-replica'; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work @@ -319,12 +344,22 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + SET citus.enable_single_hash_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index c510b07b3..0bc522ea0 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -251,9 +251,6 @@ SELECT * FROM squares ORDER BY x; 5 | 25 (5 rows) --- empty shard interval array should raise error -SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[0]); -ERROR: invalid distribution column value -- cannot use DDL commands select broadcast_intermediate_result('a', 'create table foo(int serial)'); ERROR: cannot execute utility commands @@ -507,10 +504,9 @@ WARNING: Query could not find the intermediate result file "squares_2", it was (0 rows) ROLLBACK TO SAVEPOINT s1; --- fetch from worker 2 should fail -SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); -ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/squares_1.data": No such file or directory -CONTEXT: while executing command on localhost:xxxxx +-- fetch from invalid worker port should fail +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', 57635); +ERROR: cannot connect to localhost:xxxxx to fetch intermediate results ROLLBACK TO SAVEPOINT s1; -- still, results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 781a9c86c..5cb61b2d5 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -91,6 +91,7 @@ ALTER TABLE abcd DROP COLUMN a; -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; +SET citus.enable_unique_job_ids TO off; -- returns true of the distribution key filter -- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard -- placement which is local to this not @@ -733,9 +734,51 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_0,repartition_65_2_0,repartition_65_3_0,repartition_65_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_0,repartition_66_2_0,repartition_66_3_0,repartition_66_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_1,repartition_65_2_1,repartition_65_3_1,repartition_65_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_1,repartition_66_2_1,repartition_66_3_1,repartition_66_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_2,repartition_65_2_2,repartition_65_3_2,repartition_65_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_2,repartition_66_2_2,repartition_66_3_2,repartition_66_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_3,repartition_65_2_3,repartition_65_3_3,repartition_65_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_3,repartition_66_2_3,repartition_66_3_3,repartition_66_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true + count +--------------------------------------------------------------------- + 2 +(1 row) + ROLLBACK; -- a local query is followed by an INSERT..SELECT with re-partitioning BEGIN; diff --git a/src/test/regress/expected/local_shard_execution_replicated.out b/src/test/regress/expected/local_shard_execution_replicated.out index 731c825c3..285327095 100644 --- a/src/test/regress/expected/local_shard_execution_replicated.out +++ b/src/test/regress/expected/local_shard_execution_replicated.out @@ -682,6 +682,7 @@ NOTICE: executing the copy locally for shard xxxxx ROLLBACK; BEGIN; SET citus.enable_repartition_joins TO ON; +SET citus.enable_unique_job_ids TO off; SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_replicated.distributed_table_1500002 distributed_table WHERE true @@ -693,9 +694,51 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT partition_index, 'repartition_64_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_64_1','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_64_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_64_3','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_1','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_3','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_64_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_64_1_0,repartition_64_2_0,repartition_64_3_0,repartition_64_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_65_1_0,repartition_65_2_0,repartition_65_3_0,repartition_65_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_64_1_1,repartition_64_2_1,repartition_64_3_1,repartition_64_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_65_1_1,repartition_65_2_1,repartition_65_3_1,repartition_65_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_64_1_2,repartition_64_2_2,repartition_64_3_2,repartition_64_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_65_1_2,repartition_65_2_2,repartition_65_3_2,repartition_65_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_64_1_3,repartition_64_2_3,repartition_64_3_3,repartition_64_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_65_1_3,repartition_65_2_3,repartition_65_3_3,repartition_65_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true + count +--------------------------------------------------------------------- + 2 +(1 row) + ROLLBACK; -- a local query is followed by an INSERT..SELECT with re-partitioning BEGIN; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e5cbcd994..dc3f36d5e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -428,20 +428,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.4-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -469,20 +469,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.4-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -578,20 +578,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.5-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -619,20 +619,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.5-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -994,36 +994,38 @@ DROP TABLE e_transactions; ALTER EXTENSION citus UPDATE TO '10.2-4'; -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record +(1 row) -- Snapshot of state at 11.0-1 ALTER EXTENSION citus UPDATE TO '11.0-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- - function citus_disable_node(text,integer) void | - function create_distributed_function(regprocedure,text,text) void | - function master_append_table_to_shard(bigint,text,text,integer) real | - function master_apply_delete_command(text) integer | - function master_get_table_metadata(text) record | - | function citus_check_cluster_node_health() SETOF record - | function citus_check_connection_to_node(text,integer) boolean - | function citus_disable_node(text,integer,boolean) void - | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void - | function citus_internal_global_blocked_processes() SETOF record - | function citus_internal_local_blocked_processes() SETOF record - | function citus_run_local_command(text) void - | function citus_shard_indexes_on_worker() SETOF record - | function citus_shards_on_worker() SETOF record - | function create_distributed_function(regprocedure,text,text,boolean) void - | function pg_cancel_backend(bigint) boolean - | function pg_terminate_backend(bigint,bigint) boolean - | function worker_create_or_replace_object(text[]) boolean - | function worker_drop_sequence_dependency(text) void - | function worker_drop_shell_table(text) void -(20 rows) + function citus_disable_node(text,integer) void | + function create_distributed_function(regprocedure,text,text) void | + function master_append_table_to_shard(bigint,text,text,integer) real | + function master_apply_delete_command(text) integer | + function master_get_table_metadata(text) record | + function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) SETOF record | + | function citus_check_cluster_node_health() SETOF record + | function citus_check_connection_to_node(text,integer) boolean + | function citus_disable_node(text,integer,boolean) void + | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void + | function citus_internal_global_blocked_processes() SETOF record + | function citus_internal_local_blocked_processes() SETOF record + | function citus_run_local_command(text) void + | function citus_shard_indexes_on_worker() SETOF record + | function citus_shards_on_worker() SETOF record + | function create_distributed_function(regprocedure,text,text,boolean) void + | function pg_cancel_backend(bigint) boolean + | function pg_terminate_backend(bigint,bigint) boolean + | function worker_create_or_replace_object(text[]) boolean + | function worker_drop_sequence_dependency(text) void + | function worker_drop_shell_table(text) void +(21 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 9f92db197..42c3058ee 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -117,12 +117,18 @@ order by s_i_id; SET citus.enable_repartition_joins TO ON; SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + count +--------------------------------------------------------------------- + 2 +(1 row) + SET citus.enable_single_hash_repartition_joins TO ON; SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; -ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is read-only + count +--------------------------------------------------------------------- + 2 +(1 row) + SELECT node_name, node_port FROM diff --git a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out index c9357e954..8e5242607 100644 --- a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out +++ b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out @@ -27,11 +27,17 @@ AS 'SELECT hashtext( ($1.i + $1.i2)::text);' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +CREATE FUNCTION test_udt_cmp(test_udt,test_udt) RETURNS int +AS 'SELECT CASE WHEN $1.i < $2.i THEN -1 ELSE CASE WHEN $1.i > $2.i THEN 1 ELSE CASE WHEN $1.i2 < $2.i2 THEN -1 ELSE CASE WHEN $1.i2 > $2.i2 THEN 1 ELSE 0 END END END END' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; -- We need to define two different operator classes for the composite types -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS OPERATOR 1 = (test_udt, test_udt), @@ -66,7 +72,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS OPERATOR 1 = (test_udt, test_udt), @@ -88,7 +95,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS OPERATOR 1 = (test_udt, test_udt), diff --git a/src/test/regress/expected/multi_repartition_udt.out b/src/test/regress/expected/multi_repartition_udt.out index ee6ac8800..437e188ee 100644 --- a/src/test/regress/expected/multi_repartition_udt.out +++ b/src/test/regress/expected/multi_repartition_udt.out @@ -28,11 +28,17 @@ AS 'SELECT hashtext( ($1.i + $1.i2)::text);' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +CREATE FUNCTION test_udt_cmp(test_udt,test_udt) RETURNS int +AS 'SELECT CASE WHEN $1.i < $2.i THEN -1 ELSE CASE WHEN $1.i > $2.i THEN 1 ELSE CASE WHEN $1.i2 < $2.i2 THEN -1 ELSE CASE WHEN $1.i2 > $2.i2 THEN 1 ELSE 0 END END END END' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; -- We need to define two different operator classes for the composite types -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS OPERATOR 1 = (test_udt, test_udt), @@ -67,7 +73,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS OPERATOR 1 = (test_udt, test_udt), @@ -89,7 +96,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS OPERATOR 1 = (test_udt, test_udt), diff --git a/src/test/regress/expected/prepared_statements_1.out b/src/test/regress/expected/prepared_statements_1.out index e71be3f9d..418453705 100644 --- a/src/test/regress/expected/prepared_statements_1.out +++ b/src/test/regress/expected/prepared_statements_1.out @@ -24,3 +24,25 @@ BEGIN; (1 row) ROLLBACK; +PREPARE xact_repartitioned_prepared AS + SELECT count(*) FROM repartition_prepared_test t1 JOIN repartition_prepared_test t2 USING (b); +BEGIN; + -- Prepared re-partition join in a transaction block after a write + INSERT INTO repartition_prepared_test VALUES (1,2); + EXECUTE xact_repartitioned_prepared; + count +--------------------------------------------------------------------- + 226 +(1 row) + +ROLLBACK; +BEGIN; + -- Prepared re-partition join in a transaction block before a write + EXECUTE xact_repartitioned_prepared; + count +--------------------------------------------------------------------- + 209 +(1 row) + + INSERT INTO repartition_prepared_test VALUES (1,2); +ROLLBACK; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 185bf19c5..e5e7b9215 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -14,7 +14,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND deptype = 'e' AND e.extname='citus' ORDER BY 1; - description + description --------------------------------------------------------------------- access method columnar event trigger citus_cascade_to_partition @@ -225,7 +225,7 @@ ORDER BY 1; function worker_partial_agg(oid,anyelement) function worker_partial_agg_ffunc(internal) function worker_partial_agg_sfunc(internal,oid,anyelement) - function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) + function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) function worker_partitioned_relation_size(regclass) function worker_partitioned_relation_total_size(regclass) function worker_partitioned_table_size(regclass) diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index d6d72d963..4b35d4465 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -636,7 +636,7 @@ INSERT INTO citus.pg_dist_object(classid, objid, objsubid) values('pg_class'::re INSERT INTO citus.pg_dist_object(classid, objid, objsubid) values('pg_class'::regclass::oid, 'second_dustbunnies'::regclass::oid, 0); SELECT 1 FROM master_activate_node('localhost', :worker_1_port); NOTICE: Replicating postgres objects to node localhost:57637 -DETAIL: There are 114 objects to replicate, depending on your environment this might take a while +DETAIL: There are 115 objects to replicate, depending on your environment this might take a while ?column? --------------------------------------------------------------------- 1 diff --git a/src/test/regress/sql/adaptive_executor_repartition.sql b/src/test/regress/sql/adaptive_executor_repartition.sql index bb625ae6f..1f2e21951 100644 --- a/src/test/regress/sql/adaptive_executor_repartition.sql +++ b/src/test/regress/sql/adaptive_executor_repartition.sql @@ -24,7 +24,6 @@ ROLLBACK; BEGIN; INSERT INTO ab values(1, 2); --- DDL happened before repartition query in a transaction block, so this should error. SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; ROLLBACK; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index bc79513c3..c5e2b6177 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -112,7 +112,7 @@ SET citus.enable_repartition_joins TO ON; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; BEGIN; -SET citus.enable_repartition_joins TO ON; +SET citus.enable_unique_job_ids TO off; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; END; diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index ae5402345..67e80d1ee 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -132,9 +132,6 @@ WITH (FORMAT text); SELECT * FROM squares ORDER BY x; --- empty shard interval array should raise error -SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[0]); - -- cannot use DDL commands select broadcast_intermediate_result('a', 'create table foo(int serial)'); select broadcast_intermediate_result('a', 'prepare foo as select 1'); @@ -233,8 +230,8 @@ SAVEPOINT s1; -- results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); ROLLBACK TO SAVEPOINT s1; --- fetch from worker 2 should fail -SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); +-- fetch from invalid worker port should fail +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', 57635); ROLLBACK TO SAVEPOINT s1; -- still, results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index c7dad6f00..b68863a7f 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -67,6 +67,7 @@ ALTER TABLE abcd DROP COLUMN a; -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; +SET citus.enable_unique_job_ids TO off; -- returns true of the distribution key filter -- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard diff --git a/src/test/regress/sql/local_shard_execution_replicated.sql b/src/test/regress/sql/local_shard_execution_replicated.sql index 01af172a3..81b47cfc8 100644 --- a/src/test/regress/sql/local_shard_execution_replicated.sql +++ b/src/test/regress/sql/local_shard_execution_replicated.sql @@ -348,6 +348,7 @@ ROLLBACK; BEGIN; SET citus.enable_repartition_joins TO ON; +SET citus.enable_unique_job_ids TO off; SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); ROLLBACK; diff --git a/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql b/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql index 779f4c24e..962f77318 100644 --- a/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql +++ b/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql @@ -35,12 +35,18 @@ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +CREATE FUNCTION test_udt_cmp(test_udt,test_udt) RETURNS int +AS 'SELECT CASE WHEN $1.i < $2.i THEN -1 ELSE CASE WHEN $1.i > $2.i THEN 1 ELSE CASE WHEN $1.i2 < $2.i2 THEN -1 ELSE CASE WHEN $1.i2 > $2.i2 THEN 1 ELSE 0 END END END END' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; -- We need to define two different operator classes for the composite types -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS @@ -84,7 +90,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS @@ -112,7 +119,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS diff --git a/src/test/regress/sql/multi_repartition_udt.sql b/src/test/regress/sql/multi_repartition_udt.sql index 2e5c1de24..2b71ec116 100644 --- a/src/test/regress/sql/multi_repartition_udt.sql +++ b/src/test/regress/sql/multi_repartition_udt.sql @@ -36,12 +36,18 @@ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +CREATE FUNCTION test_udt_cmp(test_udt,test_udt) RETURNS int +AS 'SELECT CASE WHEN $1.i < $2.i THEN -1 ELSE CASE WHEN $1.i > $2.i THEN 1 ELSE CASE WHEN $1.i2 < $2.i2 THEN -1 ELSE CASE WHEN $1.i2 > $2.i2 THEN 1 ELSE 0 END END END END' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; -- We need to define two different operator classes for the composite types -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS @@ -85,7 +91,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS @@ -113,7 +120,8 @@ CREATE OPERATOR FAMILY tudt_op_fam USING hash; -- One uses BTREE the other uses HASH CREATE OPERATOR CLASS tudt_op_fam_clas3 DEFAULT FOR TYPE test_udt USING BTREE AS -OPERATOR 3 = (test_udt, test_udt); +OPERATOR 3 = (test_udt, test_udt), +FUNCTION 1 test_udt_cmp(test_udt, test_udt); CREATE OPERATOR CLASS tudt_op_fam_class DEFAULT FOR TYPE test_udt USING HASH AS diff --git a/src/test/regress/sql/prepared_statements_1.sql b/src/test/regress/sql/prepared_statements_1.sql index ec3f521ad..aee511696 100644 --- a/src/test/regress/sql/prepared_statements_1.sql +++ b/src/test/regress/sql/prepared_statements_1.sql @@ -17,3 +17,18 @@ BEGIN; CREATE TEMP TABLE repartition_prepared_tmp AS EXECUTE repartition_prepared(1); SELECT count(*) from repartition_prepared_tmp; ROLLBACK; + +PREPARE xact_repartitioned_prepared AS + SELECT count(*) FROM repartition_prepared_test t1 JOIN repartition_prepared_test t2 USING (b); + +BEGIN; + -- Prepared re-partition join in a transaction block after a write + INSERT INTO repartition_prepared_test VALUES (1,2); + EXECUTE xact_repartitioned_prepared; +ROLLBACK; + +BEGIN; + -- Prepared re-partition join in a transaction block before a write + EXECUTE xact_repartitioned_prepared; + INSERT INTO repartition_prepared_test VALUES (1,2); +ROLLBACK;