diff --git a/src/backend/distributed/commands/cluster.c b/src/backend/distributed/commands/cluster.c index c539aa066..9bf134042 100644 --- a/src/backend/distributed/commands/cluster.c +++ b/src/backend/distributed/commands/cluster.c @@ -79,7 +79,10 @@ PreprocessClusterStmt(Node *node, const char *clusterCommand, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = clusterCommand; - ddlJob->taskList = DDLTaskList(relationId, clusterCommand); + + /* we don't need to do cluster deleted shards */ + bool includeOrphanedShards = false; + ddlJob->taskList = DDLTaskList(relationId, clusterCommand, includeOrphanedShards); return list_make1(ddlJob); } diff --git a/src/backend/distributed/commands/grant.c b/src/backend/distributed/commands/grant.c index c7861060a..84dba1ff4 100644 --- a/src/backend/distributed/commands/grant.c +++ b/src/backend/distributed/commands/grant.c @@ -160,7 +160,9 @@ PreprocessGrantStmt(Node *node, const char *queryString, ddlJob->taskList = NIL; if (IsCitusTable(relationId)) { - ddlJob->taskList = DDLTaskList(relationId, ddlString.data); + /* Propogate latest policies issue on deleted shards to avoid any potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(relationId, ddlString.data, includeOrphanedShards); } ddlJobs = lappend(ddlJobs, ddlJob); diff --git a/src/backend/distributed/commands/policy.c b/src/backend/distributed/commands/policy.c index 5250f9580..e124f9f70 100644 --- a/src/backend/distributed/commands/policy.c +++ b/src/backend/distributed/commands/policy.c @@ -275,7 +275,10 @@ PostprocessCreatePolicyStmt(Node *node, const char *queryString) DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = pstrdup(ddlCommand); - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* Propogate latest policies issue on deleted shards to avoid any potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); relation_close(relation, NoLock); @@ -408,7 +411,10 @@ PreprocessAlterPolicyStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relOid); ddlJob->metadataSyncCommand = pstrdup(ddlString.data); - ddlJob->taskList = DDLTaskList(relOid, ddlString.data); + + /* Propogate latest policies issue on deleted shards to avoid any potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(relOid, ddlString.data, includeOrphanedShards); relation_close(relation, NoLock); @@ -516,7 +522,10 @@ PreprocessDropPolicyStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relOid); ddlJob->metadataSyncCommand = queryString; - ddlJob->taskList = DDLTaskList(relOid, queryString); + + /* Propogate latest policies issue on deleted shards to avoid potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(relOid, queryString, includeOrphanedShards); ddlJobs = lappend(ddlJobs, ddlJob); } diff --git a/src/backend/distributed/commands/rename.c b/src/backend/distributed/commands/rename.c index 5e313d68c..52c7ed6b6 100644 --- a/src/backend/distributed/commands/rename.c +++ b/src/backend/distributed/commands/rename.c @@ -179,7 +179,14 @@ PreprocessRenameStmt(Node *node, const char *renameCommand, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId); ddlJob->metadataSyncCommand = renameCommand; - ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); + + /* + * Rename orphaned shards as well, otherwise the shard's name will be different + * from the distributed table. This will cause shard cleaner to fail as we will + * try to delete the orphaned shard with the wrong (new) name. + */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand, includeOrphanedShards); return list_make1(ddlJob); } diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index a65d6c0fe..e196f6541 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -93,7 +93,10 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* We don't need to do create statistics on deleted shards */ + bool includeOrphanedShards = false; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); List *ddlJobs = list_make1(ddlJob); @@ -202,7 +205,10 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* we don't need to do drop statistics on deleted shards */ + bool includeOrphanedShards = false; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); ddlJobs = lappend(ddlJobs, ddlJob); } @@ -268,7 +274,10 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* we don't need to do ALTER statistics on deleted shards */ + bool includeOrphanedShards = false; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); List *ddlJobs = list_make1(ddlJob); @@ -306,7 +315,10 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* we don't need to do ALTER statistics on deleted shards */ + bool includeOrphanedShards = false; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); List *ddlJobs = list_make1(ddlJob); @@ -426,7 +438,10 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* we don't need to do ALTER statistics on deleted shards */ + bool includeOrphanedShards = false; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); List *ddlJobs = list_make1(ddlJob); @@ -464,7 +479,12 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; - ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + /* Propogate owner changes on deleted shards to avoid any potential issues */ + // TODO(niupre): Can this cause failure when we try to drop orphaned table? + // If this is the case, do we need to allow CREATE STATS as well? + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); List *ddlJobs = list_make1(ddlJob); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 1cf1e54a5..6483f35ce 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1162,7 +1162,10 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, else { /* ... otherwise use standard DDL task list function */ - ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList); + + /* Propogate latest updates issue on deleted shards to avoid any potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList, includeOrphanedShards); if (!propagateCommandToWorkers) { ddlJob->taskList = NIL; @@ -1835,7 +1838,10 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt); - ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand); + + /* Propogate latest schema on deleted shards to avoid any potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand, includeOrphanedShards); return list_make1(ddlJob); } diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index 299ffcc32..dc0d0d0bb 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -719,13 +719,16 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName, ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = queryString; + /* we don't need to do CREATE trigger on deleted shards */ + bool includeOrphanedShards = false; + if (!triggerName) { /* * ENABLE/DISABLE TRIGGER ALL/USER commands do not specify trigger * name. */ - ddlJob->taskList = DDLTaskList(relationId, queryString); + ddlJob->taskList = DDLTaskList(relationId, queryString, includeOrphanedShards); return list_make1(ddlJob); } @@ -745,7 +748,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName, /* we don't have truncate triggers on shard relations */ if (!TRIGGER_FOR_TRUNCATE(triggerType)) { - ddlJob->taskList = DDLTaskList(relationId, queryString); + ddlJob->taskList = DDLTaskList(relationId, queryString, includeOrphanedShards); } return list_make1(ddlJob); diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index 0993c287f..8d9910481 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -127,6 +127,7 @@ TruncateTaskList(Oid relationId) char *schemaName = get_namespace_name(schemaId); char *relationName = get_rel_name(relationId); + /* No need to propoagate Truncate command as shards will be dropped anyways */ List *shardIntervalList = LoadShardIntervalList(relationId); /* lock metadata before getting placement lists */ diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 3b621975b..d994e895f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1613,11 +1613,12 @@ InvalidateForeignKeyGraphForDDL(void) * given list of shards. */ List * -DDLTaskList(Oid relationId, const char *commandString) +DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShards) { List *taskList = NIL; - // TODO(niupre): This should be all shards? - List *shardIntervalList = LoadShardIntervalList(relationId); + List *shardIntervalList = + (includeOrphanedShards) ? LoadShardIntervalListWithOrphanedShards(relationId) + : LoadShardIntervalList(relationId); Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *escapedSchemaName = quote_literal_cstr(schemaName); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index c5e555f19..e213e66cc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -113,6 +113,15 @@ typedef struct CitusTableCacheEntrySlot bool isValid; } CitusTableCacheEntrySlot; +/* + * ShardInterval Array Type. + */ +typedef enum ShardIntervalArrayType +{ + ACTIVE_SHARD_ARRAY, + + TO_DELETE_SHARD_ARRAY +} ShardIntervalArrayType; /* * ShardIdCacheEntry is the entry type for ShardIdCacheHash. @@ -127,6 +136,9 @@ typedef struct ShardIdCacheEntry /* pointer to the table entry to which this shard currently belongs */ CitusTableCacheEntry *tableEntry; + /* shard interval type */ + ShardIntervalArrayType arrayType; + /* index of the shard interval in the sortedShardIntervalArray of the table entry */ int shardIndex; } ShardIdCacheEntry; @@ -249,7 +261,9 @@ static void RegisterLocalGroupIdCacheCallbacks(void); static void RegisterAuthinfoCacheCallbacks(void); static void RegisterCitusTableCacheEntryReleaseCallbacks(void); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); -static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry); +static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry, + ShardInterval **sortedShardIntervalArray, + int shardIntervalArrayLength); static void CreateDistTableCache(void); static void CreateShardIdCache(void); static void CreateDistObjectCache(void); @@ -275,7 +289,7 @@ static void CachedRelationNamespaceLookupExtended(const char *relationName, bool missing_ok); static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, - int shardIndex); + ShardIntervalArrayType arrayType, int shardIndex); static Oid LookupEnumValueId(Oid typeId, char *valueName); static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot); static void InvalidateDistTableCache(void); @@ -284,6 +298,13 @@ static void InitializeTableCacheEntry(int64 shardId); static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel, CitusTableType tableType); static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry); +static void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength, + ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry); +static void FreeCitusTableCacheShardAndPlacementEntryFromArray( + ShardInterval **shardIntervalArray, + int shardIntervalArrayLength, + GroupShardPlacement **arrayOfPlacementArrays, + int *arrayOfPlacementArrayLengths); static Oid DistAuthinfoRelationId(void); static Oid DistAuthinfoIndexId(void); @@ -702,12 +723,20 @@ LoadShardInterval(uint64 shardId) ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; int shardIndex = shardIdEntry->shardIndex; + ShardIntervalArrayType arrayType = shardIdEntry->arrayType; /* the offset better be in a valid range */ - Assert(shardIndex < tableEntry->shardIntervalArrayLength); - - ShardInterval *sourceShardInterval = - tableEntry->sortedShardIntervalArray[shardIndex]; + ShardInterval *sourceShardInterval = NULL; + if (arrayType == ACTIVE_SHARD_ARRAY) + { + Assert(shardIndex < tableEntry->shardIntervalArrayLength); + sourceShardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; + } + else + { + Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); + sourceShardInterval = tableEntry->sortedOrphanedShardIntervalArray[shardIndex]; + } /* copy value to return */ ShardInterval *shardInterval = CopyShardInterval(sourceShardInterval); @@ -771,14 +800,26 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId) ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; int shardIndex = shardIdEntry->shardIndex; + ShardIntervalArrayType arrayType = shardIdEntry->arrayType; - /* the offset better be in a valid range */ - Assert(shardIndex < tableEntry->shardIntervalArrayLength); - - GroupShardPlacement *placementArray = - tableEntry->arrayOfPlacementArrays[shardIndex]; - int numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + GroupShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + if (arrayType == ACTIVE_SHARD_ARRAY) + { + /* the offset better be in a valid range */ + Assert(shardIndex < tableEntry->shardIntervalArrayLength); + placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; + numberOfPlacements = + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + } + else if (arrayType == TO_DELETE_SHARD_ARRAY) + { + /* the offset better be in a valid range */ + Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); + placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex]; + numberOfPlacements = + tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex]; + } for (int i = 0; i < numberOfPlacements; i++) { @@ -806,9 +847,11 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; int shardIndex = shardIdEntry->shardIndex; + ShardIntervalArrayType arrayType = shardIdEntry->arrayType; + GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId); ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement, - tableEntry, shardIndex); + tableEntry, arrayType, shardIndex); return nodePlacement; } @@ -829,10 +872,26 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId) ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; int shardIndex = shardIdEntry->shardIndex; - GroupShardPlacement *placementArray = - tableEntry->arrayOfPlacementArrays[shardIndex]; - int numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + ShardIntervalArrayType arrayType = shardIdEntry->arrayType; + + GroupShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + if (arrayType == ACTIVE_SHARD_ARRAY) + { + /* the offset better be in a valid range */ + Assert(shardIndex < tableEntry->shardIntervalArrayLength); + placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; + numberOfPlacements = + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + } + else if (arrayType == TO_DELETE_SHARD_ARRAY) + { + /* the offset better be in a valid range */ + Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); + placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex]; + numberOfPlacements = + tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex]; + } for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) { @@ -840,7 +899,7 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId) if (placement->groupId == groupId) { placementOnNode = ResolveGroupShardPlacement(placement, tableEntry, - shardIndex); + arrayType, shardIndex); break; } } @@ -878,9 +937,19 @@ ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId) static ShardPlacement * ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, + ShardIntervalArrayType arrayType, int shardIndex) { - ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; + + ShardInterval *shardInterval = NULL; + if (arrayType == ACTIVE_SHARD_ARRAY) + { + shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; + } + else if (arrayType == TO_DELETE_SHARD_ARRAY) + { + shardInterval = tableEntry->sortedOrphanedShardIntervalArray[shardIndex]; + } ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); int32 groupId = groupShardPlacement->groupId; @@ -1049,20 +1118,31 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId) ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; int shardIndex = shardIdEntry->shardIndex; + ShardIntervalArrayType arrayType = shardIdEntry->arrayType; - /* the offset better be in a valid range */ - Assert(shardIndex < tableEntry->shardIntervalArrayLength); - - GroupShardPlacement *placementArray = - tableEntry->arrayOfPlacementArrays[shardIndex]; - int numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + GroupShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + if (arrayType == ACTIVE_SHARD_ARRAY) + { + /* the offset better be in a valid range */ + Assert(shardIndex < tableEntry->shardIntervalArrayLength); + placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + } + else if (arrayType == TO_DELETE_SHARD_ARRAY) + { + /* the offset better be in a valid range */ + Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); + placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex]; + numberOfPlacements = tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex]; + } for (int i = 0; i < numberOfPlacements; i++) { GroupShardPlacement *groupShardPlacement = &placementArray[i]; ShardPlacement *shardPlacement = ResolveGroupShardPlacement(groupShardPlacement, tableEntry, + arrayType, shardIndex); placementList = lappend(placementList, shardPlacement); @@ -1540,7 +1620,7 @@ BuildCitusTableCacheEntry(Oid relationId) cacheEntry->hashFunction = hashFunction; - /* check the shard distribution for hash partitioned tables */ + /* check the shard distribution of ACTIVE shards for hash partitioned tables */ cacheEntry->hasUniformHashDistribution = HasUniformHashDistribution(cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength); @@ -1574,8 +1654,11 @@ BuildCitusTableCacheEntry(Oid relationId) static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry) { + List *shardIntervalList = NULL; ShardInterval **shardIntervalArray = NULL; + ShardInterval **shardIntervalToDeleteArray = NULL; ShardInterval **sortedShardIntervalArray = NULL; + ShardInterval **sortedOrphanedShardIntervalArray = NULL; FmgrInfo *shardIntervalCompareFunction = NULL; FmgrInfo *shardColumnCompareFunction = NULL; Oid columnTypeId = InvalidOid; @@ -1592,24 +1675,13 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); int shardIntervalArrayLength = list_length(distShardTupleList); + + int activeShardCount = 0; + int orphanedShardCount = 0; if (shardIntervalArrayLength > 0) { Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock); TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation); - int arrayIndex = 0; - - shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext, - shardIntervalArrayLength * - sizeof(ShardInterval *)); - - cacheEntry->arrayOfPlacementArrays = - MemoryContextAllocZero(MetadataCacheMemoryContext, - shardIntervalArrayLength * - sizeof(GroupShardPlacement *)); - cacheEntry->arrayOfPlacementArrayLengths = - MemoryContextAllocZero(MetadataCacheMemoryContext, - shardIntervalArrayLength * - sizeof(int)); HeapTuple shardTuple = NULL; foreach_ptr(shardTuple, distShardTupleList) @@ -1618,18 +1690,71 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) distShardTupleDesc, intervalTypeId, intervalTypeMod); - MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); - shardIntervalArray[arrayIndex] = CopyShardInterval(shardInterval); + Assert(shardInterval->shardState == SHARD_STATE_ACTIVE || + shardInterval->shardState == SHARD_STATE_TO_DELETE); - MemoryContextSwitchTo(oldContext); + if (shardInterval->shardState == SHARD_STATE_ACTIVE) + { + activeShardCount++; + } + else + { + orphanedShardCount++; + } + + shardIntervalList = lappend(shardIntervalList, shardInterval); heap_freetuple(shardTuple); - - arrayIndex++; } - table_close(distShardRelation, AccessShareLock); + + shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext, + activeShardCount * + sizeof(ShardInterval *)); + + shardIntervalToDeleteArray = MemoryContextAllocZero(MetadataCacheMemoryContext, + orphanedShardCount * + sizeof(ShardInterval *)); + + cacheEntry->arrayOfPlacementArrays = + MemoryContextAllocZero(MetadataCacheMemoryContext, + activeShardCount * + sizeof(GroupShardPlacement *)); + cacheEntry->arrayOfPlacementArrayLengths = + MemoryContextAllocZero(MetadataCacheMemoryContext, + activeShardCount * + sizeof(int)); + + cacheEntry->arrayOfOrphanedPlacementArrays = + MemoryContextAllocZero(MetadataCacheMemoryContext, + orphanedShardCount * + sizeof(GroupShardPlacement *)); + cacheEntry->arrayOfOrphanedPlacementArrayLengths = + MemoryContextAllocZero(MetadataCacheMemoryContext, + orphanedShardCount * + sizeof(int)); + } + + int arrayIndexOne = 0; + int arrayIndexTwo = 0; + ShardInterval *interval = NULL; + foreach_ptr(interval, shardIntervalList) + { + MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); + ShardInterval *copiedShardInterval = CopyShardInterval(interval); + + if (interval->shardState == SHARD_STATE_ACTIVE) + { + shardIntervalArray[arrayIndexOne] = copiedShardInterval; + arrayIndexOne++; + } + else + { + shardIntervalToDeleteArray[arrayIndexTwo] = copiedShardInterval; + arrayIndexTwo++; + } + MemoryContextSwitchTo(oldContext); } /* look up value comparison function */ @@ -1683,12 +1808,19 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) /* since there is a zero or one shard, it is already sorted */ sortedShardIntervalArray = shardIntervalArray; + sortedOrphanedShardIntervalArray = shardIntervalToDeleteArray; } else { /* sort the interval array */ sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray, - shardIntervalArrayLength, + activeShardCount, + cacheEntry->partitionColumn-> + varcollid, + shardIntervalCompareFunction); + + sortedOrphanedShardIntervalArray = SortShardIntervalArray(shardIntervalToDeleteArray, + orphanedShardCount, cacheEntry->partitionColumn-> varcollid, shardIntervalCompareFunction); @@ -1715,12 +1847,35 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) } cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; - cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->sortedOrphanedShardIntervalArray = sortedOrphanedShardIntervalArray; + /* initialize to zero and increment as we go */ + cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->orphanedShardIntervalArrayLength = 0; + + /* maintain shardId->(table,ShardInterval) cache and placement list. */ + BuildShardIdCacheAndPlacementList(sortedShardIntervalArray, activeShardCount, + ACTIVE_SHARD_ARRAY, cacheEntry); + + BuildShardIdCacheAndPlacementList(sortedOrphanedShardIntervalArray, orphanedShardCount, + TO_DELETE_SHARD_ARRAY, cacheEntry); + + cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; + cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; +} + + +/* + * Maintain shardId->(table,ShardInterval) cache and placement list. + */ +void +BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength, + ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry) +{ /* maintain shardId->(table,ShardInterval) cache */ - for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) + for (int shardIndex = 0; shardIndex < arrayLength; shardIndex++) { - ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; + ShardInterval *shardInterval = shardIntervalArray[shardIndex]; int64 shardId = shardInterval->shardId; int placementOffset = 0; @@ -1733,13 +1888,21 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) shardIdCacheEntry->tableEntry = cacheEntry; shardIdCacheEntry->shardIndex = shardIndex; + shardIdCacheEntry->arrayType = arrayType; /* * We should increment this only after we are sure this hasn't already * been assigned to any other relations. ResetCitusTableCacheEntry() * depends on this. */ - cacheEntry->shardIntervalArrayLength++; + if(arrayType == ACTIVE_SHARD_ARRAY) + { + cacheEntry->shardIntervalArrayLength++; + } + else if(arrayType == TO_DELETE_SHARD_ARRAY) + { + cacheEntry->orphanedShardIntervalArrayLength++; + } /* build list of shard placements */ List *placementList = BuildShardPlacementList(shardId); @@ -1763,9 +1926,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) /* store the shard index in the ShardInterval */ shardInterval->shardIndex = shardIndex; } - - cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; - cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; } @@ -1902,11 +2062,7 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray, curShardInterval->minValue); comparisonResult = DatumGetInt32(comparisonDatum); - // If one of the shards are marked as TO_DELETED, ignore the overlap. - bool markedForDelete = (lastShardInterval->shardState == SHARD_STATE_TO_DELETE || - curShardInterval->shardState == SHARD_STATE_TO_DELETE); - - if (!markedForDelete && comparisonResult >= 0) + if (comparisonResult >= 0) { return true; } @@ -3971,6 +4127,49 @@ RegisterAuthinfoCacheCallbacks(void) } +/* + * + */ +static void +FreeCitusTableCacheShardAndPlacementEntryFromArray( + ShardInterval **shardIntervalArray, + int shardIntervalArrayLength, + GroupShardPlacement **arrayOfPlacementArrays, + int *arrayOfPlacementArrayLengths) +{ + for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; + shardIndex++) + { + ShardInterval *shardInterval = shardIntervalArray[shardIndex]; + GroupShardPlacement *placementArray = arrayOfPlacementArrays[shardIndex]; + bool valueByVal = shardInterval->valueByVal; + + /* delete the shard's placements */ + if (placementArray != NULL) + { + pfree(placementArray); + } + + /* delete data pointed to by ShardInterval */ + if (!valueByVal) + { + if (shardInterval->minValueExists) + { + pfree(DatumGetPointer(shardInterval->minValue)); + } + + if (shardInterval->maxValueExists) + { + pfree(DatumGetPointer(shardInterval->maxValue)); + } + } + + /* and finally the ShardInterval itself */ + pfree(shardInterval); + } +} + + /* * ResetCitusTableCacheEntry frees any out-of-band memory used by a cache entry, * but does not free the entry itself. @@ -4008,45 +4207,31 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) } /* clean up ShardIdCacheHash */ - RemoveStaleShardIdCacheEntries(cacheEntry); + RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength); + RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedOrphanedShardIntervalArray, cacheEntry->orphanedShardIntervalArrayLength); - for (int shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength; - shardIndex++) - { - ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; - GroupShardPlacement *placementArray = - cacheEntry->arrayOfPlacementArrays[shardIndex]; - bool valueByVal = shardInterval->valueByVal; + FreeCitusTableCacheShardAndPlacementEntryFromArray( + cacheEntry->sortedShardIntervalArray, + cacheEntry->shardIntervalArrayLength, + cacheEntry->arrayOfPlacementArrays, + cacheEntry->arrayOfPlacementArrayLengths); - /* delete the shard's placements */ - if (placementArray != NULL) - { - pfree(placementArray); - } - - /* delete data pointed to by ShardInterval */ - if (!valueByVal) - { - if (shardInterval->minValueExists) - { - pfree(DatumGetPointer(shardInterval->minValue)); - } - - if (shardInterval->maxValueExists) - { - pfree(DatumGetPointer(shardInterval->maxValue)); - } - } - - /* and finally the ShardInterval itself */ - pfree(shardInterval); - } + FreeCitusTableCacheShardAndPlacementEntryFromArray( + cacheEntry->sortedOrphanedShardIntervalArray, + cacheEntry->orphanedShardIntervalArrayLength, + cacheEntry->arrayOfOrphanedPlacementArrays, + cacheEntry->arrayOfOrphanedPlacementArrayLengths); if (cacheEntry->sortedShardIntervalArray) { pfree(cacheEntry->sortedShardIntervalArray); cacheEntry->sortedShardIntervalArray = NULL; } + if (cacheEntry->sortedOrphanedShardIntervalArray) + { + pfree(cacheEntry->sortedOrphanedShardIntervalArray); + cacheEntry->sortedOrphanedShardIntervalArray = NULL; + } if (cacheEntry->arrayOfPlacementArrayLengths) { pfree(cacheEntry->arrayOfPlacementArrayLengths); @@ -4057,6 +4242,16 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) pfree(cacheEntry->arrayOfPlacementArrays); cacheEntry->arrayOfPlacementArrays = NULL; } + if (cacheEntry->arrayOfOrphanedPlacementArrayLengths) + { + pfree(cacheEntry->arrayOfOrphanedPlacementArrayLengths); + cacheEntry->arrayOfOrphanedPlacementArrayLengths = NULL; + } + if (cacheEntry->arrayOfOrphanedPlacementArrays) + { + pfree(cacheEntry->arrayOfOrphanedPlacementArrays); + cacheEntry->arrayOfOrphanedPlacementArrays = NULL; + } if (cacheEntry->referencedRelationsViaForeignKey) { list_free(cacheEntry->referencedRelationsViaForeignKey); @@ -4069,6 +4264,7 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) } cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->orphanedShardIntervalArrayLength = 0; cacheEntry->hasUninitializedShardInterval = false; cacheEntry->hasUniformHashDistribution = false; cacheEntry->hasOverlappingShardInterval = false; @@ -4084,15 +4280,16 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) * we leave it in place. */ static void -RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *invalidatedTableEntry) +RemoveStaleShardIdCacheEntries( + CitusTableCacheEntry *invalidatedTableEntry, + ShardInterval **sortedShardIntervalArray, + int shardIntervalArrayLength) { int shardIndex = 0; - int shardCount = invalidatedTableEntry->shardIntervalArrayLength; - for (shardIndex = 0; shardIndex < shardCount; shardIndex++) + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) { - ShardInterval *shardInterval = - invalidatedTableEntry->sortedShardIntervalArray[shardIndex]; + ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; int64 shardId = shardInterval->shardId; bool foundInCache = false; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 2fb7eb7f6..97ed5e0fa 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -856,8 +856,12 @@ CitusTableMetadataCreateCommandList(Oid relationId) char *metadataCommand = DistributionCreateCommand(cacheEntry); commandList = lappend(commandList, metadataCommand); - /* commands to insert pg_dist_shard & pg_dist_placement entries */ - List *shardIntervalList = LoadShardIntervalList(relationId); + /* Commands to insert pg_dist_shard & pg_dist_placement entries + * Propoagate orphaned shards as well to have consistent behavior + * given all workers have the same metadata. The information is not + * strictly neeeded and will be deleted by cleaner anyways. + */ + List *shardIntervalList = LoadShardIntervalListWithOrphanedShards(relationId); List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); commandList = list_concat(commandList, shardMetadataInsertCommandList); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 29f4983a7..0412b5da4 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1068,11 +1068,66 @@ LoadShardIntervalList(Oid relationId) return shardList; } +/* + * LoadShardIntervalListWithOrphanedShards returns a list of shard intervals related for a given + * distributed table. This includes both ACTIVE and TO_DELETE shards. + * The function returns an empty list if no shards can be found for the given relation. + * Since LoadShardIntervalListWithOrphanedShards relies on merging two sorted lists, it returns + * list where elements are sorted on shardminvalue. Shard intervals with uninitialized + * shard min/max values are placed in the end of the list. + */ +List * LoadShardIntervalListWithOrphanedShards(Oid relationId) +{ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + List *shardList = NIL; + + int indexOne = 0; + int indexTwo = 0; + SortShardIntervalContext sortContext = { + .comparisonFunction = cacheEntry->shardIntervalCompareFunction, + .collation = cacheEntry->partitionColumn->varcollid + }; + + while(indexOne < cacheEntry->shardIntervalArrayLength && indexTwo < cacheEntry->orphanedShardIntervalArrayLength) + { + ShardInterval *leftInterval = cacheEntry->sortedShardIntervalArray[indexOne]; + ShardInterval *rightInterval = cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]; + + int cmp = CompareShardIntervals(leftInterval, rightInterval, &sortContext); + + if (cmp <= 0) + { + shardList = lappend(shardList, leftInterval); + indexOne++; + } + else + { + shardList = lappend(shardList, rightInterval); + indexTwo++; + } + } + + while (indexOne < cacheEntry->shardIntervalArrayLength) + { + shardList = lappend(shardList, cacheEntry->sortedShardIntervalArray[indexOne]); + indexOne++; + } + + while (indexTwo < cacheEntry->orphanedShardIntervalArrayLength) + { + shardList = lappend(shardList, cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]); + indexTwo++; + } + + return shardList; +} + /* * LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a - * given distributed table. The function returns an empty list if no shards can be found - * for the given relation. + * given distributed table. + * The function returns both ACTIVE and TO_DELETE (orphaned) shards in the list. + * The function returns an empty list if no shards can be found for the given relation. * * This function does not use CitusTableCache and instead reads from catalog tables * directly. @@ -1121,6 +1176,7 @@ LoadShardIntervalWithLongestShardName(Oid relationId) int maxShardIndex = shardIntervalCount - 1; uint64 largestShardId = INVALID_SHARD_ID; + /* Given ACTIVE split shards have higher sequence number, we don't need to look into orphaned shards */ for (int shardIndex = 0; shardIndex <= maxShardIndex; ++shardIndex) { ShardInterval *currentShardInterval = @@ -1149,6 +1205,21 @@ ShardIntervalCount(Oid relationId) } +/* + * ShardIntervalCount returns number of shard intervals for a given distributed table. + * This includes both ACTIVE and TO_DELETE shards. + * The function returns 0 if no shards can be found for the given relation id. + */ +int +ShardIntervalCountWithOrphanedShards(Oid relationId) +{ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + + return cacheEntry->shardIntervalArrayLength + + cacheEntry->orphanedShardIntervalArrayLength; +} + + /* * LoadShardList reads list of shards for given relationId from pg_dist_shard, * and returns the list of found shardIds. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 372162518..0bb8d7cc8 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4553,6 +4553,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount) shardInterval->shardId = INVALID_SHARD_ID; shardInterval->valueTypeId = INT4OID; + shardInterval->shardState = SHARD_STATE_INVALID_FIRST; shardIntervalArray[shardIndex] = shardInterval; } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index b0b55a2cd..b951cebab 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -94,7 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, ); extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); -extern List * DDLTaskList(Oid relationId, const char *commandString); +extern List * DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShards); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 92f8a4514..4788e5b7a 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -66,9 +66,15 @@ typedef struct bool autoConverted; /* table auto-added to metadata, valid for citus local tables */ /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ + /* The list includes only ACTIVE shards */ int shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray; + /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ + /* The list includes only TO_DELETE shards */ + int orphanedShardIntervalArrayLength; + ShardInterval **sortedOrphanedShardIntervalArray; + /* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */ FmgrInfo *shardColumnCompareFunction; @@ -96,6 +102,11 @@ typedef struct /* pg_dist_placement metadata */ GroupShardPlacement **arrayOfPlacementArrays; int *arrayOfPlacementArrayLengths; + + /* pg_dist_placement metadata */ + /* The list includes only TO_DELETE shards */ + GroupShardPlacement **arrayOfOrphanedPlacementArrays; + int *arrayOfOrphanedPlacementArrayLengths; } CitusTableCacheEntry; typedef struct DistObjectCacheEntryKey @@ -143,7 +154,6 @@ extern List * AllCitusTableIds(void); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType tableType); - extern void SetCreateCitusTransactionLevel(int val); extern int GetCitusCreationLevel(void); extern bool IsCitusTable(Oid relationId); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index d7b39b10e..9e5a471f1 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -212,11 +212,15 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS); /* Function declarations to read shard and shard placement data */ extern uint32 TableShardReplicationFactor(Oid relationId); + extern List * LoadShardIntervalList(Oid relationId); +extern List * LoadShardIntervalListWithOrphanedShards(Oid relationId); extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId); extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId); extern int ShardIntervalCount(Oid relationId); +extern int ShardIntervalCountWithOrphanedShards(Oid relationId); extern List * LoadShardList(Oid relationId); + extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); extern bool NodeGroupHasShardPlacements(int32 groupId, diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 4cc99e6d5..7c6b878d2 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -47,6 +47,8 @@ extern int CompareRelationShards(const void *leftElement, const void *rightElement); extern int ShardIndex(ShardInterval *shardInterval); extern int CalculateUniformHashRangeIndex(int hashedValue, int shardCount); + +// TODO(niupre): Review API. extern ShardInterval * FindShardInterval(Datum partitionColumnValue, CitusTableCacheEntry *cacheEntry); extern int FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry); @@ -54,6 +56,7 @@ extern int SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, Oid shardIntervalCollation, FmgrInfo *compareFunction); + extern bool SingleReplicatedTable(Oid relationId);