diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 95d61995e..9590dbc40 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -263,10 +263,10 @@ GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId) for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) { - GroupShardPlacement *placement = &placementArray[placementIndex]; - - if (placement->groupId == groupId) + if (placementArray[placementIndex].groupId == groupId) { + GroupShardPlacement *placement = palloc0(sizeof(GroupShardPlacement)); + *placement = placementArray[placementIndex]; resultList = lappend(resultList, placement); } } @@ -279,9 +279,6 @@ GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId) /* * ShardIntervalsOnWorkerGroup accepts a WorkerNode and returns a list of the shard * intervals of the given table which are placed on the group the node is a part of. - * - * DO NOT modify the shard intervals returned by this function, they are not copies but - * pointers. */ static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId) @@ -304,8 +301,9 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId) if (placement->groupId == workerNode->groupId) { - ShardInterval *shardInterval = + ShardInterval *cachedShardInterval = distTableCacheEntry->sortedShardIntervalArray[shardIndex]; + ShardInterval *shardInterval = CopyShardInterval(cachedShardInterval); shardIntervalList = lappend(shardIntervalList, shardInterval); } } @@ -481,11 +479,8 @@ LoadShardIntervalList(Oid relationId) for (int i = 0; i < cacheEntry->shardIntervalArrayLength; i++) { - ShardInterval *newShardInterval = (ShardInterval *) palloc0( - sizeof(ShardInterval)); - - CopyShardInterval(cacheEntry->sortedShardIntervalArray[i], newShardInterval); - + ShardInterval *newShardInterval = + CopyShardInterval(cacheEntry->sortedShardIntervalArray[i]); shardList = lappend(shardList, newShardInterval); } @@ -552,12 +547,13 @@ AllocateUint64(uint64 value) /* - * CopyShardInterval copies fields from the specified source ShardInterval - * into the fields of the provided destination ShardInterval. + * CopyShardInterval creates a copy of the specified source ShardInterval. */ -void -CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval) +ShardInterval * +CopyShardInterval(ShardInterval *srcInterval) { + ShardInterval *destInterval = palloc0(sizeof(ShardInterval)); + destInterval->type = srcInterval->type; destInterval->relationId = srcInterval->relationId; destInterval->storageType = srcInterval->storageType; @@ -584,6 +580,8 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval) srcInterval->valueByVal, srcInterval->valueTypeLen); } + + return destInterval; } @@ -1403,10 +1401,6 @@ bool IsHashDistributedTable(Oid relationId) { CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(relationId); - if (sourceTableEntry == NULL) - { - return false; - } char sourceDistributionMethod = sourceTableEntry->partitionMethod; return sourceDistributionMethod == DISTRIBUTE_BY_HASH; } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index fa5022d88..73946c2f6 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -377,8 +377,7 @@ LoadShardInterval(uint64 shardId) tableEntry->sortedShardIntervalArray[shardEntry->shardIndex]; /* copy value to return */ - ShardInterval *shardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); - CopyShardInterval(sourceShardInterval, shardInterval); + ShardInterval *shardInterval = CopyShardInterval(sourceShardInterval); return shardInterval; } @@ -1172,10 +1171,7 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) intervalTypeMod); MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); - ShardInterval *newShardInterval = (ShardInterval *) palloc0( - sizeof(ShardInterval)); - CopyShardInterval(shardInterval, newShardInterval); - shardIntervalArray[arrayIndex] = newShardInterval; + shardIntervalArray[arrayIndex] = CopyShardInterval(shardInterval); MemoryContextSwitchTo(oldContext); @@ -3388,7 +3384,7 @@ InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId) * key graph cache, we use pg_dist_colocation, which is never invalidated for * other purposes. * - * We acknowledge that it is not a very intiutive way of implementing this cache + * We acknowledge that it is not a very intuitive way of implementing this cache * invalidation, but, seems acceptable for now. If this becomes problematic, we * could try using a magic oid where we're sure that no relation would ever use * that oid. @@ -3744,8 +3740,8 @@ LookupDistShardTuples(Oid relationId) /* * LookupShardRelation returns the logical relation oid a shard belongs to. * - * Errors out if the shardId does not exist and missingOk is false. Returns - * InvalidOid if the shardId does not exist and missingOk is true. + * Errors out if the shardId does not exist and missingOk is false. + * Returns InvalidOid if the shardId does not exist and missingOk is true. */ Oid LookupShardRelation(int64 shardId, bool missingOk) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 8eba076bc..dc359881c 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -73,6 +73,7 @@ #include "rewrite/rewriteManip.h" #include "utils/builtins.h" #include "utils/catcache.h" +#include "utils/datum.h" #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -1968,10 +1969,21 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, RANGE_PARTITION_TYPE) { CitusTableCacheEntry *cache = GetCitusTableCacheEntry(baseRelationId); - uint32 shardCount = cache->shardIntervalArrayLength; - ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray; + int shardCount = cache->shardIntervalArrayLength; + ShardInterval **cachedSortedShardIntervalArray = + cache->sortedShardIntervalArray; + bool hasUninitializedShardInterval = + cache->hasUninitializedShardInterval; + + ShardInterval **sortedShardIntervalArray = + palloc0(sizeof(ShardInterval) * shardCount); + + for (int shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + sortedShardIntervalArray[shardIndex] = + CopyShardInterval(cachedSortedShardIntervalArray[shardIndex]); + } - bool hasUninitializedShardInterval = cache->hasUninitializedShardInterval; if (hasUninitializedShardInterval) { ereport(ERROR, (errmsg("cannot range repartition shard with " @@ -1979,7 +1991,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, } mapMergeJob->partitionType = partitionType; - mapMergeJob->partitionCount = shardCount; + mapMergeJob->partitionCount = (uint32) shardCount; mapMergeJob->sortedShardIntervalArray = sortedShardIntervalArray; mapMergeJob->sortedShardIntervalArrayLength = shardCount; } @@ -2507,11 +2519,13 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, anchorShardId = shardInterval->shardId; } - taskShardList = lappend(taskShardList, list_make1(shardInterval)); + ShardInterval *copiedShardInterval = CopyShardInterval(shardInterval); + + taskShardList = lappend(taskShardList, list_make1(copiedShardInterval)); RelationShard *relationShard = CitusMakeNode(RelationShard); - relationShard->relationId = shardInterval->relationId; - relationShard->shardId = shardInterval->shardId; + relationShard->relationId = copiedShardInterval->relationId; + relationShard->shardId = copiedShardInterval->shardId; relationShardList = lappend(relationShardList, relationShard); } @@ -2573,6 +2587,11 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) { + if (firstRelationId == secondRelationId) + { + return true; + } + CitusTableCacheEntry *firstTableCache = GetCitusTableCacheEntry(firstRelationId); CitusTableCacheEntry *secondTableCache = GetCitusTableCacheEntry(secondRelationId); @@ -3472,8 +3491,12 @@ UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval) Const *minConstant = (Const *) minNode; Const *maxConstant = (Const *) maxNode; - minConstant->constvalue = shardInterval->minValue; - maxConstant->constvalue = shardInterval->maxValue; + minConstant->constvalue = datumCopy(shardInterval->minValue, + shardInterval->valueByVal, + shardInterval->valueTypeLen); + maxConstant->constvalue = datumCopy(shardInterval->maxValue, + shardInterval->valueByVal, + shardInterval->valueTypeLen); minConstant->constisnull = false; maxConstant->constisnull = false; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 10c877b1a..80c963c6b 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1480,7 +1480,6 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved, DeferredErrorMessage **planningError) { List *insertTaskList = NIL; - ListCell *modifyRouteCell = NULL; Oid distributedTableId = ExtractFirstCitusTableId(query); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); @@ -1495,10 +1494,9 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved, return NIL; } - foreach(modifyRouteCell, modifyRouteList) + ModifyRoute *modifyRoute = NULL; + foreach_ptr(modifyRoute, modifyRouteList) { - ModifyRoute *modifyRoute = (ModifyRoute *) lfirst(modifyRouteCell); - Task *modifyTask = CreateTask(MODIFY_TASK); modifyTask->anchorShardId = modifyRoute->shardId; modifyTask->replicationModel = cacheEntry->replicationModel; @@ -2319,9 +2317,9 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery, if (inputDistributionKeyValue && !inputDistributionKeyValue->constisnull) { CitusTableCacheEntry *cache = GetCitusTableCacheEntry(relationId); - ShardInterval *shardInterval = + ShardInterval *cachedShardInterval = FindShardInterval(inputDistributionKeyValue->constvalue, cache); - if (shardInterval == NULL) + if (cachedShardInterval == NULL) { ereport(ERROR, (errmsg( "could not find shardinterval to which to send the query"))); @@ -2332,6 +2330,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery, /* set the outgoing partition column value if requested */ *outputPartitionValueConst = inputDistributionKeyValue; } + ShardInterval *shardInterval = CopyShardInterval(cachedShardInterval); List *shardIntervalList = list_make1(shardInterval); return list_make1(shardIntervalList); diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index ebb4b833c..69191f3d3 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -76,15 +76,16 @@ #include "catalog/pg_am.h" #include "catalog/pg_collation.h" #include "catalog/pg_type.h" -#include "distributed/metadata_cache.h" #include "distributed/distributed_planner.h" +#include "distributed/listutils.h" +#include "distributed/log_utils.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_physical_planner.h" -#include "distributed/shardinterval_utils.h" #include "distributed/pg_dist_partition.h" +#include "distributed/shardinterval_utils.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" -#include "distributed/log_utils.h" #include "nodes/nodeFuncs.h" #include "nodes/makefuncs.h" #include "optimizer/clauses.h" @@ -1358,16 +1359,12 @@ static List * DeepCopyShardIntervalList(List *originalShardIntervalList) { List *copiedShardIntervalList = NIL; - ListCell *shardIntervalCell = NULL; - foreach(shardIntervalCell, originalShardIntervalList) + ShardInterval *originalShardInterval = NULL; + foreach_ptr(originalShardInterval, originalShardIntervalList) { - ShardInterval *originalShardInterval = - (ShardInterval *) lfirst(shardIntervalCell); - ShardInterval *copiedShardInterval = - (ShardInterval *) palloc0(sizeof(ShardInterval)); + ShardInterval *copiedShardInterval = CopyShardInterval(originalShardInterval); - CopyShardInterval(originalShardInterval, copiedShardInterval); copiedShardIntervalList = lappend(copiedShardIntervalList, copiedShardInterval); } diff --git a/src/backend/distributed/test/foreign_key_relationship_query.c b/src/backend/distributed/test/foreign_key_relationship_query.c index 1c70705ed..d2cb207d4 100644 --- a/src/backend/distributed/test/foreign_key_relationship_query.c +++ b/src/backend/distributed/test/foreign_key_relationship_query.c @@ -39,11 +39,16 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - List *refList = cacheEntry->referencingRelationsViaForeignKey; /* create a function context for cross-call persistence */ functionContext = SRF_FIRSTCALL_INIT(); + MemoryContext oldContext = + MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); + List *refList = list_copy( + cacheEntry->referencingRelationsViaForeignKey); + MemoryContextSwitchTo(oldContext); + foreignRelationCell = list_head(refList); functionContext->user_fctx = foreignRelationCell; } @@ -90,11 +95,15 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - List *refList = cacheEntry->referencedRelationsViaForeignKey; /* create a function context for cross-call persistence */ functionContext = SRF_FIRSTCALL_INIT(); + MemoryContext oldContext = + MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); + List *refList = list_copy(cacheEntry->referencedRelationsViaForeignKey); + MemoryContextSwitchTo(oldContext); + foreignRelationCell = list_head(refList); functionContext->user_fctx = foreignRelationCell; } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 2c60e6e9b..4ebb10fb2 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -929,8 +929,7 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) if ((partitionMethod == DISTRIBUTE_BY_APPEND) || (partitionMethod == DISTRIBUTE_BY_RANGE)) { - ShardInterval *copyShardInterval = CitusMakeNode(ShardInterval); - CopyShardInterval(shardInterval, copyShardInterval); + ShardInterval *copyShardInterval = CopyShardInterval(shardInterval); colocatedShardList = lappend(colocatedShardList, copyShardInterval); @@ -959,8 +958,7 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) ShardInterval *colocatedShardInterval = colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; - ShardInterval *copyShardInterval = CitusMakeNode(ShardInterval); - CopyShardInterval(colocatedShardInterval, copyShardInterval); + ShardInterval *copyShardInterval = CopyShardInterval(colocatedShardInterval); colocatedShardList = lappend(colocatedShardList, copyShardInterval); } diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 4c3c2b5ac..fab6b3861 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -101,7 +101,7 @@ extern uint32 TableShardReplicationFactor(Oid relationId); extern List * LoadShardIntervalList(Oid relationId); extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); -extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); +extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern void CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement); extern uint64 ShardLength(uint64 shardId);