CI fix and reindent

niupre/DeferredDrop
Nitish Upreti 2022-08-13 21:24:27 -07:00
parent 96b389d8ef
commit b165be402f
15 changed files with 121 additions and 93 deletions

View File

@ -162,7 +162,8 @@ PreprocessGrantStmt(Node *node, const char *queryString,
{ {
/* Propogate latest policies issue on deleted shards to avoid any potential issues */ /* Propogate latest policies issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true; bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlString.data, includeOrphanedShards); ddlJob->taskList = DDLTaskList(relationId, ddlString.data,
includeOrphanedShards);
} }
ddlJobs = lappend(ddlJobs, ddlJob); ddlJobs = lappend(ddlJobs, ddlJob);

View File

@ -481,8 +481,8 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
/* Propogate owner changes on deleted shards to avoid any potential issues */ /* Propogate owner changes on deleted shards to avoid any potential issues */
// TODO(niupre): Can this cause failure when we try to drop orphaned table? /* TODO(niupre): Can this cause failure when we try to drop orphaned table? */
// If this is the case, do we need to allow CREATE STATS as well? /* If this is the case, do we need to allow CREATE STATS as well? */
bool includeOrphanedShards = true; bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);

View File

@ -1165,7 +1165,8 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
/* Propogate latest updates issue on deleted shards to avoid any potential issues */ /* Propogate latest updates issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true; bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList, includeOrphanedShards); ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList,
includeOrphanedShards);
if (!propagateCommandToWorkers) if (!propagateCommandToWorkers)
{ {
ddlJob->taskList = NIL; ddlJob->taskList = NIL;
@ -1841,7 +1842,8 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
/* Propogate latest schema on deleted shards to avoid any potential issues */ /* Propogate latest schema on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true; bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand, includeOrphanedShards); ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand,
includeOrphanedShards);
return list_make1(ddlJob); return list_make1(ddlJob);
} }

View File

@ -1618,7 +1618,7 @@ DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShard
List *taskList = NIL; List *taskList = NIL;
List *shardIntervalList = List *shardIntervalList =
(includeOrphanedShards) ? LoadShardIntervalListWithOrphanedShards(relationId) (includeOrphanedShards) ? LoadShardIntervalListWithOrphanedShards(relationId)
: LoadShardIntervalList(relationId); : LoadShardIntervalList(relationId);
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName); char *escapedSchemaName = quote_literal_cstr(schemaName);

View File

@ -358,7 +358,7 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
[Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL, [Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL,
[Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex], [Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex],
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex], [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex],
[Anum_pg_dist_shard_shardstate- 1] = SHARD_STATE_INVALID_FIRST [Anum_pg_dist_shard_shardstate - 1] = SHARD_STATE_INVALID_FIRST
}; };
bool nullsArray[Natts_pg_dist_shard] = { bool nullsArray[Natts_pg_dist_shard] = {
[Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex], [Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex],

View File

@ -262,8 +262,8 @@ static void RegisterAuthinfoCacheCallbacks(void);
static void RegisterCitusTableCacheEntryReleaseCallbacks(void); static void RegisterCitusTableCacheEntryReleaseCallbacks(void);
static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry, static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry,
ShardInterval **sortedShardIntervalArray, ShardInterval **sortedShardIntervalArray,
int shardIntervalArrayLength); int shardIntervalArrayLength);
static void CreateDistTableCache(void); static void CreateDistTableCache(void);
static void CreateShardIdCache(void); static void CreateShardIdCache(void);
static void CreateDistObjectCache(void); static void CreateDistObjectCache(void);
@ -289,7 +289,8 @@ static void CachedRelationNamespaceLookupExtended(const char *relationName,
bool missing_ok); bool missing_ok);
static ShardPlacement * ResolveGroupShardPlacement( static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry,
ShardIntervalArrayType arrayType, int shardIndex); ShardIntervalArrayType arrayType, int
shardIndex);
static Oid LookupEnumValueId(Oid typeId, char *valueName); static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot); static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot);
static void InvalidateDistTableCache(void); static void InvalidateDistTableCache(void);
@ -298,13 +299,18 @@ static void InitializeTableCacheEntry(int64 shardId);
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel, static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
CitusTableType tableType); CitusTableType tableType);
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry); static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
static void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength, static void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int
ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry); arrayLength,
ShardIntervalArrayType arrayType,
CitusTableCacheEntry *cacheEntry);
static void FreeCitusTableCacheShardAndPlacementEntryFromArray( static void FreeCitusTableCacheShardAndPlacementEntryFromArray(
ShardInterval **shardIntervalArray, ShardInterval **shardIntervalArray,
int shardIntervalArrayLength, int
GroupShardPlacement **arrayOfPlacementArrays, shardIntervalArrayLength,
int *arrayOfPlacementArrayLengths); GroupShardPlacement **
arrayOfPlacementArrays,
int *
arrayOfPlacementArrayLengths);
static Oid DistAuthinfoRelationId(void); static Oid DistAuthinfoRelationId(void);
static Oid DistAuthinfoIndexId(void); static Oid DistAuthinfoIndexId(void);
@ -810,7 +816,7 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
Assert(shardIndex < tableEntry->shardIntervalArrayLength); Assert(shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfPlacementArrays[shardIndex];
numberOfPlacements = numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex]; tableEntry->arrayOfPlacementArrayLengths[shardIndex];
} }
else if (arrayType == TO_DELETE_SHARD_ARRAY) else if (arrayType == TO_DELETE_SHARD_ARRAY)
{ {
@ -818,7 +824,7 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex];
numberOfPlacements = numberOfPlacements =
tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex];
} }
for (int i = 0; i < numberOfPlacements; i++) for (int i = 0; i < numberOfPlacements; i++)
@ -851,7 +857,8 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId); GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId);
ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement, ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement,
tableEntry, arrayType, shardIndex); tableEntry, arrayType,
shardIndex);
return nodePlacement; return nodePlacement;
} }
@ -882,7 +889,7 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
Assert(shardIndex < tableEntry->shardIntervalArrayLength); Assert(shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfPlacementArrays[shardIndex];
numberOfPlacements = numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex]; tableEntry->arrayOfPlacementArrayLengths[shardIndex];
} }
else if (arrayType == TO_DELETE_SHARD_ARRAY) else if (arrayType == TO_DELETE_SHARD_ARRAY)
{ {
@ -890,7 +897,7 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex];
numberOfPlacements = numberOfPlacements =
tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex];
} }
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
@ -940,7 +947,6 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
ShardIntervalArrayType arrayType, ShardIntervalArrayType arrayType,
int shardIndex) int shardIndex)
{ {
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
if (arrayType == ACTIVE_SHARD_ARRAY) if (arrayType == ACTIVE_SHARD_ARRAY)
{ {
@ -1134,7 +1140,8 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex];
numberOfPlacements = tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; numberOfPlacements =
tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex];
} }
for (int i = 0; i < numberOfPlacements; i++) for (int i = 0; i < numberOfPlacements; i++)
@ -1710,30 +1717,30 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
table_close(distShardRelation, AccessShareLock); table_close(distShardRelation, AccessShareLock);
shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext, shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
activeShardCount * activeShardCount *
sizeof(ShardInterval *)); sizeof(ShardInterval *));
shardIntervalToDeleteArray = MemoryContextAllocZero(MetadataCacheMemoryContext, shardIntervalToDeleteArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount * orphanedShardCount *
sizeof(ShardInterval *)); sizeof(ShardInterval *));
cacheEntry->arrayOfPlacementArrays = cacheEntry->arrayOfPlacementArrays =
MemoryContextAllocZero(MetadataCacheMemoryContext, MemoryContextAllocZero(MetadataCacheMemoryContext,
activeShardCount * activeShardCount *
sizeof(GroupShardPlacement *)); sizeof(GroupShardPlacement *));
cacheEntry->arrayOfPlacementArrayLengths = cacheEntry->arrayOfPlacementArrayLengths =
MemoryContextAllocZero(MetadataCacheMemoryContext, MemoryContextAllocZero(MetadataCacheMemoryContext,
activeShardCount * activeShardCount *
sizeof(int)); sizeof(int));
cacheEntry->arrayOfOrphanedShardsPlacementArrays = cacheEntry->arrayOfOrphanedShardsPlacementArrays =
MemoryContextAllocZero(MetadataCacheMemoryContext, MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount * orphanedShardCount *
sizeof(GroupShardPlacement *)); sizeof(GroupShardPlacement *));
cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths = cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths =
MemoryContextAllocZero(MetadataCacheMemoryContext, MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount * orphanedShardCount *
sizeof(int)); sizeof(int));
} }
int arrayIndexOne = 0; int arrayIndexOne = 0;
@ -1819,11 +1826,13 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
varcollid, varcollid,
shardIntervalCompareFunction); shardIntervalCompareFunction);
sortedOrphanedShardIntervalArray = SortShardIntervalArray(shardIntervalToDeleteArray, sortedOrphanedShardIntervalArray = SortShardIntervalArray(
orphanedShardCount, shardIntervalToDeleteArray,
cacheEntry->partitionColumn-> orphanedShardCount,
varcollid, cacheEntry->
shardIntervalCompareFunction); partitionColumn->
varcollid,
shardIntervalCompareFunction);
/* check if there exists any shard intervals with no min/max values */ /* check if there exists any shard intervals with no min/max values */
cacheEntry->hasUninitializedShardInterval = cacheEntry->hasUninitializedShardInterval =
@ -1855,9 +1864,10 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
/* maintain shardId->(table,ShardInterval) cache and placement list. */ /* maintain shardId->(table,ShardInterval) cache and placement list. */
BuildShardIdCacheAndPlacementList(sortedShardIntervalArray, activeShardCount, BuildShardIdCacheAndPlacementList(sortedShardIntervalArray, activeShardCount,
ACTIVE_SHARD_ARRAY, cacheEntry); ACTIVE_SHARD_ARRAY, cacheEntry);
BuildShardIdCacheAndPlacementList(sortedOrphanedShardIntervalArray, orphanedShardCount, BuildShardIdCacheAndPlacementList(sortedOrphanedShardIntervalArray,
orphanedShardCount,
TO_DELETE_SHARD_ARRAY, cacheEntry); TO_DELETE_SHARD_ARRAY, cacheEntry);
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
@ -1870,7 +1880,8 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
*/ */
void void
BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength, BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength,
ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry) ShardIntervalArrayType arrayType,
CitusTableCacheEntry *cacheEntry)
{ {
/* maintain shardId->(table,ShardInterval) cache */ /* maintain shardId->(table,ShardInterval) cache */
for (int shardIndex = 0; shardIndex < arrayLength; shardIndex++) for (int shardIndex = 0; shardIndex < arrayLength; shardIndex++)
@ -1895,11 +1906,11 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL
* been assigned to any other relations. ResetCitusTableCacheEntry() * been assigned to any other relations. ResetCitusTableCacheEntry()
* depends on this. * depends on this.
*/ */
if(arrayType == ACTIVE_SHARD_ARRAY) if (arrayType == ACTIVE_SHARD_ARRAY)
{ {
cacheEntry->shardIntervalArrayLength++; cacheEntry->shardIntervalArrayLength++;
} }
else if(arrayType == TO_DELETE_SHARD_ARRAY) else if (arrayType == TO_DELETE_SHARD_ARRAY)
{ {
cacheEntry->orphanedShardIntervalArrayLength++; cacheEntry->orphanedShardIntervalArrayLength++;
} }
@ -1920,7 +1931,7 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if(arrayType == ACTIVE_SHARD_ARRAY) if (arrayType == ACTIVE_SHARD_ARRAY)
{ {
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements;
@ -1928,7 +1939,8 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL
else else
{ {
cacheEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex] = placementArray; cacheEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex] = numberOfPlacements; cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex] =
numberOfPlacements;
} }
/* store the shard index in the ShardInterval */ /* store the shard index in the ShardInterval */
@ -4139,11 +4151,11 @@ RegisterAuthinfoCacheCallbacks(void)
* *
*/ */
static void static void
FreeCitusTableCacheShardAndPlacementEntryFromArray( FreeCitusTableCacheShardAndPlacementEntryFromArray(ShardInterval **shardIntervalArray,
ShardInterval **shardIntervalArray, int shardIntervalArrayLength,
int shardIntervalArrayLength, GroupShardPlacement **
GroupShardPlacement **arrayOfPlacementArrays, arrayOfPlacementArrays,
int *arrayOfPlacementArrayLengths) int *arrayOfPlacementArrayLengths)
{ {
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; for (int shardIndex = 0; shardIndex < shardIntervalArrayLength;
shardIndex++) shardIndex++)
@ -4215,20 +4227,23 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
} }
/* clean up ShardIdCacheHash */ /* clean up ShardIdCacheHash */
RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength); RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedShardIntervalArray,
RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedOrphanedShardIntervalArray, cacheEntry->orphanedShardIntervalArrayLength); cacheEntry->shardIntervalArrayLength);
RemoveStaleShardIdCacheEntries(cacheEntry,
cacheEntry->sortedOrphanedShardIntervalArray,
cacheEntry->orphanedShardIntervalArrayLength);
FreeCitusTableCacheShardAndPlacementEntryFromArray( FreeCitusTableCacheShardAndPlacementEntryFromArray(
cacheEntry->sortedShardIntervalArray, cacheEntry->sortedShardIntervalArray,
cacheEntry->shardIntervalArrayLength, cacheEntry->shardIntervalArrayLength,
cacheEntry->arrayOfPlacementArrays, cacheEntry->arrayOfPlacementArrays,
cacheEntry->arrayOfPlacementArrayLengths); cacheEntry->arrayOfPlacementArrayLengths);
FreeCitusTableCacheShardAndPlacementEntryFromArray( FreeCitusTableCacheShardAndPlacementEntryFromArray(
cacheEntry->sortedOrphanedShardIntervalArray, cacheEntry->sortedOrphanedShardIntervalArray,
cacheEntry->orphanedShardIntervalArrayLength, cacheEntry->orphanedShardIntervalArrayLength,
cacheEntry->arrayOfOrphanedShardsPlacementArrays, cacheEntry->arrayOfOrphanedShardsPlacementArrays,
cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths); cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths);
if (cacheEntry->sortedShardIntervalArray) if (cacheEntry->sortedShardIntervalArray)
{ {
@ -4288,10 +4303,9 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
* we leave it in place. * we leave it in place.
*/ */
static void static void
RemoveStaleShardIdCacheEntries( RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *invalidatedTableEntry,
CitusTableCacheEntry *invalidatedTableEntry, ShardInterval **sortedShardIntervalArray,
ShardInterval **sortedShardIntervalArray, int shardIntervalArrayLength)
int shardIntervalArrayLength)
{ {
int shardIndex = 0; int shardIndex = 0;

View File

@ -3233,11 +3233,13 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
* not sane, the user can only affect its own tables. Given that the * not sane, the user can only affect its own tables. Given that the
* user is owner of the table, we should allow. * user is owner of the table, we should allow.
*/ */
EnsureShardMetadataIsSane(relationId, shardId, storageType, shardState, shardMinValue, EnsureShardMetadataIsSane(relationId, shardId, storageType, shardState,
shardMinValue,
shardMaxValue); shardMaxValue);
} }
InsertShardRow(relationId, shardId, storageType, shardState, shardMinValue, shardMaxValue); InsertShardRow(relationId, shardId, storageType, shardState, shardMinValue,
shardMaxValue);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -3632,7 +3634,7 @@ citus_internal_update_shard_and_placement_state_metadata(PG_FUNCTION_ARGS)
placement = linitial(shardPlacementList); placement = linitial(shardPlacementList);
WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId, WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId,
false /* missingOk */); false /* missingOk */);
if (!workerNode) if (!workerNode)
{ {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

View File

@ -1068,6 +1068,7 @@ LoadShardIntervalList(Oid relationId)
return shardList; return shardList;
} }
/* /*
* LoadShardIntervalListWithOrphanedShards returns a list of shard intervals related for a given * LoadShardIntervalListWithOrphanedShards returns a list of shard intervals related for a given
* distributed table. This includes both ACTIVE and TO_DELETE shards. * distributed table. This includes both ACTIVE and TO_DELETE shards.
@ -1076,7 +1077,8 @@ LoadShardIntervalList(Oid relationId)
* list where elements are sorted on shardminvalue. Shard intervals with uninitialized * list where elements are sorted on shardminvalue. Shard intervals with uninitialized
* shard min/max values are placed in the end of the list. * shard min/max values are placed in the end of the list.
*/ */
List * LoadShardIntervalListWithOrphanedShards(Oid relationId) List *
LoadShardIntervalListWithOrphanedShards(Oid relationId)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
List *shardList = NIL; List *shardList = NIL;
@ -1088,10 +1090,12 @@ List * LoadShardIntervalListWithOrphanedShards(Oid relationId)
.collation = cacheEntry->partitionColumn->varcollid .collation = cacheEntry->partitionColumn->varcollid
}; };
while(indexOne < cacheEntry->shardIntervalArrayLength && indexTwo < cacheEntry->orphanedShardIntervalArrayLength) while (indexOne < cacheEntry->shardIntervalArrayLength && indexTwo <
cacheEntry->orphanedShardIntervalArrayLength)
{ {
ShardInterval *leftInterval = cacheEntry->sortedShardIntervalArray[indexOne]; ShardInterval *leftInterval = cacheEntry->sortedShardIntervalArray[indexOne];
ShardInterval *rightInterval = cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]; ShardInterval *rightInterval =
cacheEntry->sortedOrphanedShardIntervalArray[indexTwo];
int cmp = CompareShardIntervals(leftInterval, rightInterval, &sortContext); int cmp = CompareShardIntervals(leftInterval, rightInterval, &sortContext);
@ -1115,7 +1119,8 @@ List * LoadShardIntervalListWithOrphanedShards(Oid relationId)
while (indexTwo < cacheEntry->orphanedShardIntervalArrayLength) while (indexTwo < cacheEntry->orphanedShardIntervalArrayLength)
{ {
shardList = lappend(shardList, cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]); shardList = lappend(shardList,
cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]);
indexTwo++; indexTwo++;
} }
@ -1216,7 +1221,7 @@ ShardIntervalCountWithOrphanedShards(Oid relationId)
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
return cacheEntry->shardIntervalArrayLength + return cacheEntry->shardIntervalArrayLength +
cacheEntry->orphanedShardIntervalArrayLength; cacheEntry->orphanedShardIntervalArrayLength;
} }

View File

@ -265,7 +265,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
LockRelationOid(sourceRelationId, AccessShareLock); LockRelationOid(sourceRelationId, AccessShareLock);
/* prevent placement changes of the source relation until we colocate with them */ /* prevent placement changes of the source relation until we colocate with them */
// TODO(niupre): We should only return ACTIVE shards, not ALL shards. /* TODO(niupre): We should only return ACTIVE shards, not ALL shards. */
List *sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); List *sourceShardIntervalList = LoadShardIntervalList(sourceRelationId);
LockShardListMetadata(sourceShardIntervalList, ShareLock); LockShardListMetadata(sourceShardIntervalList, ShareLock);
@ -378,7 +378,8 @@ CreateReferenceTableShard(Oid distributedTableId)
/* shard state is active by default */ /* shard state is active by default */
ShardState shardState = SHARD_STATE_ACTIVE; ShardState shardState = SHARD_STATE_ACTIVE;
InsertShardRow(distributedTableId, shardId, shardStorageType, shardState, shardMinValue, InsertShardRow(distributedTableId, shardId, shardStorageType, shardState,
shardMinValue,
shardMaxValue); shardMaxValue);
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,

View File

@ -565,10 +565,10 @@ BlockingShardSplit(SplitOperation splitOperation,
else else
{ {
/* /*
* Drop old shards and delete related metadata. Have to do that before * Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks * creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards). * preventing inconsistent metadata (like overlapping shards).
*/ */
DropShardList(sourceColocatedShardIntervalList); DropShardList(sourceColocatedShardIntervalList);
} }
@ -1147,6 +1147,7 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
} }
} }
/* /*
* MarkShardListForDrop drops shards and their metadata from both the coordinator and * MarkShardListForDrop drops shards and their metadata from both the coordinator and
* mx nodes. * mx nodes.
@ -1175,9 +1176,9 @@ MarkShardListForDrop(List *shardIntervalList)
{ {
StringInfo updateShardCommand = makeStringInfo(); StringInfo updateShardCommand = makeStringInfo();
appendStringInfo(updateShardCommand, appendStringInfo(updateShardCommand,
"SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)", "SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)",
shardId, shardId,
newState); newState);
SendCommandToWorkersWithMetadata(updateShardCommand->data); SendCommandToWorkersWithMetadata(updateShardCommand->data);
} }
@ -1476,10 +1477,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
else else
{ {
/* /*
* Drop old shards and delete related metadata. Have to do that before * Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks * creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards). * preventing inconsistent metadata (like overlapping shards).
*/ */
DropShardList(sourceColocatedShardIntervalList); DropShardList(sourceColocatedShardIntervalList);
} }

View File

@ -187,7 +187,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++; candidateNodeIndex++;
} }
InsertShardRow(relationId, shardId, storageType, shardState, nullMinValue, nullMaxValue); InsertShardRow(relationId, shardId, storageType, shardState, nullMinValue,
nullMaxValue);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor); ShardReplicationFactor);

View File

@ -3982,7 +3982,7 @@ ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax,
int firstComparison = DatumGetInt32(firstDatum); int firstComparison = DatumGetInt32(firstDatum);
int secondComparison = DatumGetInt32(secondDatum); int secondComparison = DatumGetInt32(secondDatum);
// If one of the shards are marked as TO_DELETED, ignore the overlap. /* If one of the shards are marked as TO_DELETED, ignore the overlap. */
bool markedForDelete = (firstState == SHARD_STATE_TO_DELETE || bool markedForDelete = (firstState == SHARD_STATE_TO_DELETE ||
secondState == SHARD_STATE_TO_DELETE); secondState == SHARD_STATE_TO_DELETE);

View File

@ -1,10 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint, relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text, storage_type "char", shardstate integer,
shard_max_value text shard_min_value text, shard_max_value text
) )
RETURNS void RETURNS void
LANGUAGE C LANGUAGE C
AS 'MODULE_PATHNAME'; AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS
'Inserts into pg_dist_shard with user checks'; 'Inserts into pg_dist_shard with user checks';

View File

@ -94,7 +94,8 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
); );
extern void MarkInvalidateForeignKeyGraph(void); extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void); extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShards); extern List * DDLTaskList(Oid relationId, const char *commandString, bool
includeOrphanedShards);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void); extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void); extern bool DropSchemaOrDBInProgress(void);

View File

@ -245,7 +245,7 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
int shardState, text *shardMinValue, text *shardMaxValue); int shardState, text *shardMinValue, text *shardMaxValue);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,