From b165be402fa9824c7f1bcbcc284c2f487d8bef06 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Sat, 13 Aug 2022 21:24:27 -0700 Subject: [PATCH] CI fix and reindent --- src/backend/distributed/commands/grant.c | 3 +- src/backend/distributed/commands/statistics.c | 4 +- src/backend/distributed/commands/table.c | 6 +- .../distributed/commands/utility_hook.c | 2 +- .../partitioned_intermediate_results.c | 2 +- .../distributed/metadata/metadata_cache.c | 130 ++++++++++-------- .../distributed/metadata/metadata_sync.c | 8 +- .../distributed/metadata/metadata_utility.c | 15 +- .../distributed/operations/create_shards.c | 5 +- .../distributed/operations/shard_split.c | 23 ++-- .../distributed/operations/stage_protocol.c | 3 +- .../planner/multi_physical_planner.c | 2 +- .../latest.sql | 6 +- .../distributed/commands/utility_hook.h | 3 +- src/include/distributed/metadata_utility.h | 2 +- 15 files changed, 121 insertions(+), 93 deletions(-) diff --git a/src/backend/distributed/commands/grant.c b/src/backend/distributed/commands/grant.c index 84dba1ff4..23042fddc 100644 --- a/src/backend/distributed/commands/grant.c +++ b/src/backend/distributed/commands/grant.c @@ -162,7 +162,8 @@ PreprocessGrantStmt(Node *node, const char *queryString, { /* Propogate latest policies issue on deleted shards to avoid any potential issues */ bool includeOrphanedShards = true; - ddlJob->taskList = DDLTaskList(relationId, ddlString.data, includeOrphanedShards); + ddlJob->taskList = DDLTaskList(relationId, ddlString.data, + includeOrphanedShards); } ddlJobs = lappend(ddlJobs, ddlJob); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index e196f6541..7d9ff5398 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -481,8 +481,8 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString, ddlJob->metadataSyncCommand = ddlCommand; /* Propogate owner changes on deleted shards to avoid any potential issues */ - // TODO(niupre): Can this cause failure when we try to drop orphaned table? - // If this is the case, do we need to allow CREATE STATS as well? + /* TODO(niupre): Can this cause failure when we try to drop orphaned table? */ + /* If this is the case, do we need to allow CREATE STATS as well? */ bool includeOrphanedShards = true; ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 6483f35ce..2afb718fb 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1165,7 +1165,8 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, /* Propogate latest updates issue on deleted shards to avoid any potential issues */ bool includeOrphanedShards = true; - ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList, includeOrphanedShards); + ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList, + includeOrphanedShards); if (!propagateCommandToWorkers) { ddlJob->taskList = NIL; @@ -1841,7 +1842,8 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, /* Propogate latest schema on deleted shards to avoid any potential issues */ bool includeOrphanedShards = true; - ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand, includeOrphanedShards); + ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand, + includeOrphanedShards); return list_make1(ddlJob); } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index d994e895f..fe7fb8e40 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1618,7 +1618,7 @@ DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShard List *taskList = NIL; List *shardIntervalList = (includeOrphanedShards) ? LoadShardIntervalListWithOrphanedShards(relationId) - : LoadShardIntervalList(relationId); + : LoadShardIntervalList(relationId); Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *escapedSchemaName = quote_literal_cstr(schemaName); diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index c170f345e..a1cfe65a3 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -358,7 +358,7 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, [Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL, [Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex], [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex], - [Anum_pg_dist_shard_shardstate- 1] = SHARD_STATE_INVALID_FIRST + [Anum_pg_dist_shard_shardstate - 1] = SHARD_STATE_INVALID_FIRST }; bool nullsArray[Natts_pg_dist_shard] = { [Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex], diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1be5040b7..1e40fec95 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -262,8 +262,8 @@ static void RegisterAuthinfoCacheCallbacks(void); static void RegisterCitusTableCacheEntryReleaseCallbacks(void); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry, - ShardInterval **sortedShardIntervalArray, - int shardIntervalArrayLength); + ShardInterval **sortedShardIntervalArray, + int shardIntervalArrayLength); static void CreateDistTableCache(void); static void CreateShardIdCache(void); static void CreateDistObjectCache(void); @@ -289,7 +289,8 @@ static void CachedRelationNamespaceLookupExtended(const char *relationName, bool missing_ok); static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, - ShardIntervalArrayType arrayType, int shardIndex); + ShardIntervalArrayType arrayType, int + shardIndex); static Oid LookupEnumValueId(Oid typeId, char *valueName); static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot); static void InvalidateDistTableCache(void); @@ -298,13 +299,18 @@ static void InitializeTableCacheEntry(int64 shardId); static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel, CitusTableType tableType); static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry); -static void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength, - ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry); +static void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int + arrayLength, + ShardIntervalArrayType arrayType, + CitusTableCacheEntry *cacheEntry); static void FreeCitusTableCacheShardAndPlacementEntryFromArray( ShardInterval **shardIntervalArray, - int shardIntervalArrayLength, - GroupShardPlacement **arrayOfPlacementArrays, - int *arrayOfPlacementArrayLengths); + int + shardIntervalArrayLength, + GroupShardPlacement ** + arrayOfPlacementArrays, + int * + arrayOfPlacementArrayLengths); static Oid DistAuthinfoRelationId(void); static Oid DistAuthinfoIndexId(void); @@ -810,7 +816,7 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId) Assert(shardIndex < tableEntry->shardIntervalArrayLength); placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; } else if (arrayType == TO_DELETE_SHARD_ARRAY) { @@ -818,7 +824,7 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId) Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex]; numberOfPlacements = - tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; + tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; } for (int i = 0; i < numberOfPlacements; i++) @@ -851,7 +857,8 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId); ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement, - tableEntry, arrayType, shardIndex); + tableEntry, arrayType, + shardIndex); return nodePlacement; } @@ -882,7 +889,7 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId) Assert(shardIndex < tableEntry->shardIntervalArrayLength); placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardIndex]; + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; } else if (arrayType == TO_DELETE_SHARD_ARRAY) { @@ -890,7 +897,7 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId) Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex]; numberOfPlacements = - tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; + tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; } for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) @@ -940,7 +947,6 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, ShardIntervalArrayType arrayType, int shardIndex) { - ShardInterval *shardInterval = NULL; if (arrayType == ACTIVE_SHARD_ARRAY) { @@ -1134,7 +1140,8 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId) /* the offset better be in a valid range */ Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex]; - numberOfPlacements = tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; + numberOfPlacements = + tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex]; } for (int i = 0; i < numberOfPlacements; i++) @@ -1710,30 +1717,30 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) table_close(distShardRelation, AccessShareLock); shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext, - activeShardCount * - sizeof(ShardInterval *)); + activeShardCount * + sizeof(ShardInterval *)); shardIntervalToDeleteArray = MemoryContextAllocZero(MetadataCacheMemoryContext, - orphanedShardCount * - sizeof(ShardInterval *)); + orphanedShardCount * + sizeof(ShardInterval *)); cacheEntry->arrayOfPlacementArrays = MemoryContextAllocZero(MetadataCacheMemoryContext, - activeShardCount * - sizeof(GroupShardPlacement *)); + activeShardCount * + sizeof(GroupShardPlacement *)); cacheEntry->arrayOfPlacementArrayLengths = MemoryContextAllocZero(MetadataCacheMemoryContext, - activeShardCount * - sizeof(int)); + activeShardCount * + sizeof(int)); cacheEntry->arrayOfOrphanedShardsPlacementArrays = MemoryContextAllocZero(MetadataCacheMemoryContext, - orphanedShardCount * - sizeof(GroupShardPlacement *)); + orphanedShardCount * + sizeof(GroupShardPlacement *)); cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths = MemoryContextAllocZero(MetadataCacheMemoryContext, - orphanedShardCount * - sizeof(int)); + orphanedShardCount * + sizeof(int)); } int arrayIndexOne = 0; @@ -1819,11 +1826,13 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) varcollid, shardIntervalCompareFunction); - sortedOrphanedShardIntervalArray = SortShardIntervalArray(shardIntervalToDeleteArray, - orphanedShardCount, - cacheEntry->partitionColumn-> - varcollid, - shardIntervalCompareFunction); + sortedOrphanedShardIntervalArray = SortShardIntervalArray( + shardIntervalToDeleteArray, + orphanedShardCount, + cacheEntry-> + partitionColumn-> + varcollid, + shardIntervalCompareFunction); /* check if there exists any shard intervals with no min/max values */ cacheEntry->hasUninitializedShardInterval = @@ -1855,9 +1864,10 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) /* maintain shardId->(table,ShardInterval) cache and placement list. */ BuildShardIdCacheAndPlacementList(sortedShardIntervalArray, activeShardCount, - ACTIVE_SHARD_ARRAY, cacheEntry); + ACTIVE_SHARD_ARRAY, cacheEntry); - BuildShardIdCacheAndPlacementList(sortedOrphanedShardIntervalArray, orphanedShardCount, + BuildShardIdCacheAndPlacementList(sortedOrphanedShardIntervalArray, + orphanedShardCount, TO_DELETE_SHARD_ARRAY, cacheEntry); cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; @@ -1870,7 +1880,8 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) */ void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength, - ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry) + ShardIntervalArrayType arrayType, + CitusTableCacheEntry *cacheEntry) { /* maintain shardId->(table,ShardInterval) cache */ for (int shardIndex = 0; shardIndex < arrayLength; shardIndex++) @@ -1895,11 +1906,11 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL * been assigned to any other relations. ResetCitusTableCacheEntry() * depends on this. */ - if(arrayType == ACTIVE_SHARD_ARRAY) + if (arrayType == ACTIVE_SHARD_ARRAY) { cacheEntry->shardIntervalArrayLength++; } - else if(arrayType == TO_DELETE_SHARD_ARRAY) + else if (arrayType == TO_DELETE_SHARD_ARRAY) { cacheEntry->orphanedShardIntervalArrayLength++; } @@ -1920,7 +1931,7 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL } MemoryContextSwitchTo(oldContext); - if(arrayType == ACTIVE_SHARD_ARRAY) + if (arrayType == ACTIVE_SHARD_ARRAY) { cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; @@ -1928,7 +1939,8 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL else { cacheEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex] = placementArray; - cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex] = numberOfPlacements; + cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex] = + numberOfPlacements; } /* store the shard index in the ShardInterval */ @@ -4139,11 +4151,11 @@ RegisterAuthinfoCacheCallbacks(void) * */ static void -FreeCitusTableCacheShardAndPlacementEntryFromArray( - ShardInterval **shardIntervalArray, - int shardIntervalArrayLength, - GroupShardPlacement **arrayOfPlacementArrays, - int *arrayOfPlacementArrayLengths) +FreeCitusTableCacheShardAndPlacementEntryFromArray(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength, + GroupShardPlacement ** + arrayOfPlacementArrays, + int *arrayOfPlacementArrayLengths) { for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) @@ -4215,20 +4227,23 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) } /* clean up ShardIdCacheHash */ - RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength); - RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedOrphanedShardIntervalArray, cacheEntry->orphanedShardIntervalArrayLength); + RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedShardIntervalArray, + cacheEntry->shardIntervalArrayLength); + RemoveStaleShardIdCacheEntries(cacheEntry, + cacheEntry->sortedOrphanedShardIntervalArray, + cacheEntry->orphanedShardIntervalArrayLength); FreeCitusTableCacheShardAndPlacementEntryFromArray( - cacheEntry->sortedShardIntervalArray, - cacheEntry->shardIntervalArrayLength, - cacheEntry->arrayOfPlacementArrays, - cacheEntry->arrayOfPlacementArrayLengths); + cacheEntry->sortedShardIntervalArray, + cacheEntry->shardIntervalArrayLength, + cacheEntry->arrayOfPlacementArrays, + cacheEntry->arrayOfPlacementArrayLengths); FreeCitusTableCacheShardAndPlacementEntryFromArray( - cacheEntry->sortedOrphanedShardIntervalArray, - cacheEntry->orphanedShardIntervalArrayLength, - cacheEntry->arrayOfOrphanedShardsPlacementArrays, - cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths); + cacheEntry->sortedOrphanedShardIntervalArray, + cacheEntry->orphanedShardIntervalArrayLength, + cacheEntry->arrayOfOrphanedShardsPlacementArrays, + cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths); if (cacheEntry->sortedShardIntervalArray) { @@ -4288,10 +4303,9 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) * we leave it in place. */ static void -RemoveStaleShardIdCacheEntries( - CitusTableCacheEntry *invalidatedTableEntry, - ShardInterval **sortedShardIntervalArray, - int shardIntervalArrayLength) +RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *invalidatedTableEntry, + ShardInterval **sortedShardIntervalArray, + int shardIntervalArrayLength) { int shardIndex = 0; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 97ed5e0fa..82c518e16 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -3233,11 +3233,13 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) * not sane, the user can only affect its own tables. Given that the * user is owner of the table, we should allow. */ - EnsureShardMetadataIsSane(relationId, shardId, storageType, shardState, shardMinValue, + EnsureShardMetadataIsSane(relationId, shardId, storageType, shardState, + shardMinValue, shardMaxValue); } - InsertShardRow(relationId, shardId, storageType, shardState, shardMinValue, shardMaxValue); + InsertShardRow(relationId, shardId, storageType, shardState, shardMinValue, + shardMaxValue); PG_RETURN_VOID(); } @@ -3632,7 +3634,7 @@ citus_internal_update_shard_and_placement_state_metadata(PG_FUNCTION_ARGS) placement = linitial(shardPlacementList); WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId, - false /* missingOk */); + false /* missingOk */); if (!workerNode) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index b53b2f4ae..c8e6e276f 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1068,6 +1068,7 @@ LoadShardIntervalList(Oid relationId) return shardList; } + /* * LoadShardIntervalListWithOrphanedShards returns a list of shard intervals related for a given * distributed table. This includes both ACTIVE and TO_DELETE shards. @@ -1076,7 +1077,8 @@ LoadShardIntervalList(Oid relationId) * list where elements are sorted on shardminvalue. Shard intervals with uninitialized * shard min/max values are placed in the end of the list. */ -List * LoadShardIntervalListWithOrphanedShards(Oid relationId) +List * +LoadShardIntervalListWithOrphanedShards(Oid relationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); List *shardList = NIL; @@ -1088,10 +1090,12 @@ List * LoadShardIntervalListWithOrphanedShards(Oid relationId) .collation = cacheEntry->partitionColumn->varcollid }; - while(indexOne < cacheEntry->shardIntervalArrayLength && indexTwo < cacheEntry->orphanedShardIntervalArrayLength) + while (indexOne < cacheEntry->shardIntervalArrayLength && indexTwo < + cacheEntry->orphanedShardIntervalArrayLength) { ShardInterval *leftInterval = cacheEntry->sortedShardIntervalArray[indexOne]; - ShardInterval *rightInterval = cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]; + ShardInterval *rightInterval = + cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]; int cmp = CompareShardIntervals(leftInterval, rightInterval, &sortContext); @@ -1115,7 +1119,8 @@ List * LoadShardIntervalListWithOrphanedShards(Oid relationId) while (indexTwo < cacheEntry->orphanedShardIntervalArrayLength) { - shardList = lappend(shardList, cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]); + shardList = lappend(shardList, + cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]); indexTwo++; } @@ -1216,7 +1221,7 @@ ShardIntervalCountWithOrphanedShards(Oid relationId) CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); return cacheEntry->shardIntervalArrayLength + - cacheEntry->orphanedShardIntervalArrayLength; + cacheEntry->orphanedShardIntervalArrayLength; } diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 6ec84dcc0..ca0b79cb0 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -265,7 +265,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool LockRelationOid(sourceRelationId, AccessShareLock); /* prevent placement changes of the source relation until we colocate with them */ - // TODO(niupre): We should only return ACTIVE shards, not ALL shards. + /* TODO(niupre): We should only return ACTIVE shards, not ALL shards. */ List *sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); LockShardListMetadata(sourceShardIntervalList, ShareLock); @@ -378,7 +378,8 @@ CreateReferenceTableShard(Oid distributedTableId) /* shard state is active by default */ ShardState shardState = SHARD_STATE_ACTIVE; - InsertShardRow(distributedTableId, shardId, shardStorageType, shardState, shardMinValue, + InsertShardRow(distributedTableId, shardId, shardStorageType, shardState, + shardMinValue, shardMaxValue); List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index ffb80673d..1b244e10e 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -565,10 +565,10 @@ BlockingShardSplit(SplitOperation splitOperation, else { /* - * Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ DropShardList(sourceColocatedShardIntervalList); } @@ -1147,6 +1147,7 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, } } + /* * MarkShardListForDrop drops shards and their metadata from both the coordinator and * mx nodes. @@ -1175,9 +1176,9 @@ MarkShardListForDrop(List *shardIntervalList) { StringInfo updateShardCommand = makeStringInfo(); appendStringInfo(updateShardCommand, - "SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)", - shardId, - newState); + "SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)", + shardId, + newState); SendCommandToWorkersWithMetadata(updateShardCommand->data); } @@ -1476,10 +1477,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, else { /* - * Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ DropShardList(sourceColocatedShardIntervalList); } diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index aa6f82014..464affe79 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -187,7 +187,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, shardState, nullMinValue, nullMaxValue); + InsertShardRow(relationId, shardId, storageType, shardState, nullMinValue, + nullMaxValue); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 0bb8d7cc8..8cc3c8219 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3982,7 +3982,7 @@ ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, int firstComparison = DatumGetInt32(firstDatum); int secondComparison = DatumGetInt32(secondDatum); - // If one of the shards are marked as TO_DELETED, ignore the overlap. + /* If one of the shards are marked as TO_DELETED, ignore the overlap. */ bool markedForDelete = (firstState == SHARD_STATE_TO_DELETE || secondState == SHARD_STATE_TO_DELETE); diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql index 7411d9179..f7f62dc9f 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql @@ -1,10 +1,10 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( relation_id regclass, shard_id bigint, - storage_type "char", shard_min_value text, - shard_max_value text + storage_type "char", shardstate integer, + shard_min_value text, shard_max_value text ) RETURNS void LANGUAGE C AS 'MODULE_PATHNAME'; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS 'Inserts into pg_dist_shard with user checks'; diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index b951cebab..9aea42231 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -94,7 +94,8 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, ); extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); -extern List * DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShards); +extern List * DDLTaskList(Oid relationId, const char *commandString, bool + includeOrphanedShards); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 9e5a471f1..24562becc 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -245,7 +245,7 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - int shardState, text *shardMinValue, text *shardMaxValue); + int shardState, text *shardMinValue, text *shardMaxValue); extern void DeleteShardRow(uint64 shardId); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength,