diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 4e6981fe8..09d560f4a 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -134,7 +134,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return false; } - List *placementList = FinalizedShardPlacementList(shardInterval->shardId); + List *placementList = ActiveShardPlacementList(shardInterval->shardId); if (list_length(placementList) != 1) { /* punt on this for now */ diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index b2386b734..f1e08c78b 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -532,7 +532,7 @@ CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; - task->taskPlacementList = FinalizedShardPlacementList(shardId); + task->taskPlacementList = ActiveShardPlacementList(shardId); taskList = lappend(taskList, task); @@ -577,7 +577,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; - task->taskPlacementList = FinalizedShardPlacementList(shardId); + task->taskPlacementList = ActiveShardPlacementList(shardId); taskList = lappend(taskList, task); @@ -907,7 +907,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; - task->taskPlacementList = FinalizedShardPlacementList(shardId); + task->taskPlacementList = ActiveShardPlacementList(shardId); taskList = lappend(taskList, task); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 6716f2505..91c8e8903 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -208,7 +208,7 @@ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, static bool BinaryOutputFunctionDefined(Oid typeId); static List * MasterShardPlacementList(uint64 shardId); -static List * RemoteFinalizedShardPlacementList(uint64 shardId); +static List * RemoteActiveShardPlacementList(uint64 shardId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, @@ -915,14 +915,14 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, ALLOCSET_DEFAULT_MAXSIZE); - /* release finalized placement list at the end of this function */ + /* release active placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); - List *finalizedPlacementList = MasterShardPlacementList(shardId); + List *activePlacementList = MasterShardPlacementList(shardId); MemoryContextSwitchTo(oldContext); - foreach(placementCell, finalizedPlacementList) + foreach(placementCell, activePlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); char *nodeUser = CurrentUserName(); @@ -981,7 +981,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, } /* if all placements failed, error out */ - if (failedPlacementCount == list_length(finalizedPlacementList)) + if (failedPlacementCount == list_length(activePlacementList)) { ereport(ERROR, (errmsg("could not connect to any active placements"))); } @@ -1097,38 +1097,38 @@ BinaryOutputFunctionDefined(Oid typeId) /* - * MasterShardPlacementList dispatches the finalized shard placements call + * MasterShardPlacementList dispatches the active shard placements call * between local or remote master node according to the master connection state. */ static List * MasterShardPlacementList(uint64 shardId) { - List *finalizedPlacementList = NIL; + List *activePlacementList = NIL; if (masterConnection == NULL) { - finalizedPlacementList = FinalizedShardPlacementList(shardId); + activePlacementList = ActiveShardPlacementList(shardId); } else { - finalizedPlacementList = RemoteFinalizedShardPlacementList(shardId); + activePlacementList = RemoteActiveShardPlacementList(shardId); } - return finalizedPlacementList; + return activePlacementList; } /* - * RemoteFinalizedShardPlacementList gets the finalized shard placement list + * RemoteActiveShardPlacementList gets the active shard placement list * for the given shard id from the remote master node. */ static List * -RemoteFinalizedShardPlacementList(uint64 shardId) +RemoteActiveShardPlacementList(uint64 shardId) { - List *finalizedPlacementList = NIL; + List *activePlacementList = NIL; bool raiseInterrupts = true; StringInfo shardPlacementsCommand = makeStringInfo(); - appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); + appendStringInfo(shardPlacementsCommand, ACTIVE_SHARD_PLACEMENTS_QUERY, shardId); if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data)) { @@ -1161,7 +1161,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId) */ shardPlacement->nodeId = -1; - finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement); + activePlacementList = lappend(activePlacementList, shardPlacement); } } else @@ -1173,7 +1173,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId) queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); Assert(!queryResult); - return finalizedPlacementList; + return activePlacementList; } @@ -3248,17 +3248,17 @@ InitializeCopyShardState(CopyShardState *shardState, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* release finalized placement list at the end of this function */ + /* release active placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); - List *finalizedPlacementList = MasterShardPlacementList(shardId); + List *activePlacementList = MasterShardPlacementList(shardId); MemoryContextSwitchTo(oldContext); shardState->shardId = shardId; shardState->placementStateList = NIL; - foreach(placementCell, finalizedPlacementList) + foreach(placementCell, activePlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); @@ -3300,7 +3300,7 @@ InitializeCopyShardState(CopyShardState *shardState, } /* if all placements failed, error out */ - if (failedPlacementCount == list_length(finalizedPlacementList)) + if (failedPlacementCount == list_length(activePlacementList)) { ereport(ERROR, (errmsg("could not connect to any active placements"))); } diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 637fa01c0..cd13e7937 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1323,7 +1323,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = leftShardId; - task->taskPlacementList = FinalizedShardPlacementList(leftShardId); + task->taskPlacementList = ActiveShardPlacementList(leftShardId); task->relationShardList = list_make2(leftRelationShard, rightRelationShard); taskList = lappend(taskList, task); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 540bc6f95..dcdc343af 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -868,7 +868,7 @@ DDLTaskList(Oid relationId, const char *commandString) task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; - task->taskPlacementList = FinalizedShardPlacementList(shardId); + task->taskPlacementList = ActiveShardPlacementList(shardId); taskList = lappend(taskList, task); } diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index c9ecc6e38..4825d48df 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -229,7 +229,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; - task->taskPlacementList = FinalizedShardPlacementList(shardId); + task->taskPlacementList = ActiveShardPlacementList(shardId); taskList = lappend(taskList, task); } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index cc56f4749..0db02f57c 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -1069,10 +1069,10 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); /* - * We only set shard state if its current state is FILE_FINALIZED, which - * prevents overwriting shard state if it is already set at somewhere else. + * We only set shard state if it currently is SHARD_STATE_ACTIVE, which + * prevents overwriting shard state if it was already set somewhere else. */ - if (shardPlacement->shardState == FILE_FINALIZED) + if (shardPlacement->shardState == SHARD_STATE_ACTIVE) { MarkShardPlacementInactive(shardPlacement); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index c66c8c541..0ad2a246d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3503,10 +3503,10 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede ShardPlacement *shardPlacement = placementExecution->shardPlacement; /* - * We only set shard state if its current state is FILE_FINALIZED, which - * prevents overwriting shard state if it is already set at somewhere else. + * We only set shard state if it currently is SHARD_STATE_ACTIVE, which + * prevents overwriting shard state if it was already set somewhere else. */ - if (shardPlacement->shardState == FILE_FINALIZED) + if (shardPlacement->shardState == SHARD_STATE_ACTIVE) { MarkShardPlacementInactive(shardPlacement); } diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 480f85b82..d03b7e4db 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -458,7 +458,7 @@ ColocationTransfers(List *fragmentList, DistTableCacheEntry *targetRelation) foreach(fragmentCell, fragmentList) { DistributedResultFragment *fragment = lfirst(fragmentCell); - List *placementList = FinalizedShardPlacementList(fragment->targetShardId); + List *placementList = ActiveShardPlacementList(fragment->targetShardId); ListCell *placementCell = NULL; foreach(placementCell, placementList) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 98b683ce0..85de59949 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -412,7 +412,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); LockShardDistributionMetadata(shardId, ShareLock); - List *insertShardPlacementList = FinalizedShardPlacementList(shardId); + List *insertShardPlacementList = ActiveShardPlacementList(shardId); RelationShard *relationShard = CitusMakeNode(RelationShard); relationShard->relationId = targetShardInterval->relationId; diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index f97deb33b..00429bd14 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -305,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool ShardPlacement *sourcePlacement = (ShardPlacement *) lfirst(sourceShardPlacementCell); int32 groupId = sourcePlacement->groupId; - const RelayFileState shardState = FILE_FINALIZED; + const ShardState shardState = SHARD_STATE_ACTIVE; const uint64 shardSize = 0; /* diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index dd5651d71..8cd02dfc2 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -434,7 +434,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, errdetail("Marking this shard placement for " "deletion"))); - UpdateShardPlacementState(placementId, FILE_TO_DELETE); + UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE); continue; } diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 9b3f409a0..88d9a1161 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -620,15 +620,15 @@ CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement) /* * ShardLength finds shard placements for the given shardId, extracts the length - * of a finalized shard, and returns the shard's length. This function errors - * out if we cannot find any finalized shard placements for the given shardId. + * of an active shard, and returns the shard's length. This function errors + * out if we cannot find any active shard placements for the given shardId. */ uint64 ShardLength(uint64 shardId) { uint64 shardLength = 0; - List *shardPlacementList = FinalizedShardPlacementList(shardId); + List *shardPlacementList = ActiveShardPlacementList(shardId); if (shardPlacementList == NIL) { ereport(ERROR, (errmsg("could not find length of shard " UINT64_FORMAT, shardId), @@ -664,7 +664,8 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements) if (onlyConsiderActivePlacements) { ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(FILE_FINALIZED)); + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(SHARD_STATE_ACTIVE)); } SysScanDesc scanDescriptor = systable_beginscan(pgPlacement, @@ -673,53 +674,53 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements) NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); - bool hasFinalizedPlacements = HeapTupleIsValid(heapTuple); + bool hasActivePlacements = HeapTupleIsValid(heapTuple); systable_endscan(scanDescriptor); heap_close(pgPlacement, NoLock); - return hasFinalizedPlacements; + return hasActivePlacements; } /* - * FinalizedShardPlacementList finds shard placements for the given shardId from - * system catalogs, chooses placements that are in finalized state, and returns + * ActiveShardPlacementList finds shard placements for the given shardId from + * system catalogs, chooses placements that are in active state, and returns * these shard placements in a new list. */ List * -FinalizedShardPlacementList(uint64 shardId) +ActiveShardPlacementList(uint64 shardId) { - List *finalizedPlacementList = NIL; + List *activePlacementList = NIL; List *shardPlacementList = ShardPlacementList(shardId); ListCell *shardPlacementCell = NULL; foreach(shardPlacementCell, shardPlacementList) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); - if (shardPlacement->shardState == FILE_FINALIZED) + if (shardPlacement->shardState == SHARD_STATE_ACTIVE) { - finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement); + activePlacementList = lappend(activePlacementList, shardPlacement); } } - return SortList(finalizedPlacementList, CompareShardPlacementsByWorker); + return SortList(activePlacementList, CompareShardPlacementsByWorker); } /* - * FinalizedShardPlacement finds a shard placement for the given shardId from - * system catalog, chooses a placement that is in finalized state and returns + * ActiveShardPlacement finds a shard placement for the given shardId from + * system catalog, chooses a placement that is in active state and returns * that shard placement. If this function cannot find a healthy shard placement * and missingOk is set to false it errors out. */ ShardPlacement * -FinalizedShardPlacement(uint64 shardId, bool missingOk) +ActiveShardPlacement(uint64 shardId, bool missingOk) { - List *finalizedPlacementList = FinalizedShardPlacementList(shardId); + List *activePlacementList = ActiveShardPlacementList(shardId); ShardPlacement *shardPlacement = NULL; - if (list_length(finalizedPlacementList) == 0) + if (list_length(activePlacementList) == 0) { if (!missingOk) { @@ -731,7 +732,7 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk) return shardPlacement; } - shardPlacement = (ShardPlacement *) linitial(finalizedPlacementList); + shardPlacement = (ShardPlacement *) linitial(activePlacementList); return shardPlacement; } @@ -1242,14 +1243,13 @@ ShardPlacementOnGroup(uint64 shardId, int groupId) /* * MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where - * the state is set to invalid (e.g., FILE_INACTIVE). It also marks the partitions - * of the shard placements as inactive if the shardPlacement belongs to a partitioned - * table. + * the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the + * shard placements as inactive if shardPlacement belongs to a partitioned table. */ void MarkShardPlacementInactive(ShardPlacement *shardPlacement) { - UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); + UpdateShardPlacementState(shardPlacement->placementId, SHARD_STATE_INACTIVE); /* * In case the shard belongs to a partitioned table, we make sure to update @@ -1259,7 +1259,7 @@ MarkShardPlacementInactive(ShardPlacement *shardPlacement) ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); if (PartitionedTable(shardInterval->relationId)) { - UpdatePartitionShardPlacementStates(shardPlacement, FILE_INACTIVE); + UpdatePartitionShardPlacementStates(shardPlacement, SHARD_STATE_INACTIVE); } } diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 334bfdbae..7db6038bb 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -322,7 +322,7 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName, targetNodePort, missingOk); - UpdateShardPlacementState(placement->placementId, FILE_FINALIZED); + UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE); } @@ -382,17 +382,17 @@ EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePo sourceNodeName, sourceNodePort, missingSourceOk); - if (sourcePlacement->shardState != FILE_FINALIZED) + if (sourcePlacement->shardState != SHARD_STATE_ACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("source placement must be in finalized state"))); + errmsg("source placement must be in active state"))); } ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, targetNodePort, missingTargetOk); - if (targetPlacement->shardState != FILE_INACTIVE) + if (targetPlacement->shardState != SHARD_STATE_INACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("target placement must be in inactive state"))); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 439a44188..b6f030a90 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -277,7 +277,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) char *shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName); - List *shardPlacementList = FinalizedShardPlacementList(shardId); + List *shardPlacementList = ActiveShardPlacementList(shardId); if (shardPlacementList == NIL) { ereport(ERROR, (errmsg("could not find any shard placements for shardId " @@ -397,7 +397,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; int shardIndex = -1; /* not used in this code path */ - const RelayFileState shardState = FILE_FINALIZED; + const ShardState shardState = SHARD_STATE_ACTIVE; const uint64 shardSize = 0; MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, @@ -453,7 +453,7 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, int workerNodeIndex = (workerStartIndex + attemptNumber) % workerNodeCount; WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); uint32 nodeGroupId = workerNode->groupId; - const RelayFileState shardState = FILE_FINALIZED; + const ShardState shardState = SHARD_STATE_ACTIVE; const uint64 shardSize = 0; uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, @@ -778,7 +778,7 @@ UpdateShardStatistics(int64 shardId) char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); - List *shardPlacementList = FinalizedShardPlacementList(shardId); + List *shardPlacementList = ActiveShardPlacementList(shardId); /* get shard's statistics from a shard placement */ foreach(shardPlacementCell, shardPlacementList) @@ -819,7 +819,7 @@ UpdateShardStatistics(int64 shardId) int32 groupId = placement->groupId; DeleteShardPlacementRow(placementId); - InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize, + InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, shardSize, groupId); } diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 0206085c7..84e89226a 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -120,7 +120,7 @@ TruncateTaskList(Oid relationId) task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; - task->taskPlacementList = FinalizedShardPlacementList(shardId); + task->taskPlacementList = ActiveShardPlacementList(shardId); taskList = lappend(taskList, task); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 4908cd430..c016cfacb 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -731,7 +731,7 @@ ShardListInsertCommand(List *shardIntervalList) ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; - List *shardPlacementList = FinalizedShardPlacementList(shardId); + List *shardPlacementList = ActiveShardPlacementList(shardId); ListCell *shardPlacementCell = NULL; foreach(shardPlacementCell, shardPlacementList) diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 0a4c5a8da..1499a2c87 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -327,7 +327,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return NULL; } - placementList = FinalizedShardPlacementList(shardInterval->shardId); + placementList = ActiveShardPlacementList(shardInterval->shardId); if (list_length(placementList) != 1) { /* punt on this for now */ diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 03ebd0e59..707bfcb6f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -535,7 +535,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter } /* get the placements for insert target shard and its intersection with select */ - List *insertShardPlacementList = FinalizedShardPlacementList(shardId); + List *insertShardPlacementList = ActiveShardPlacementList(shardId); List *intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, selectPlacementList); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 1b9046145..618d99744 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5285,7 +5285,7 @@ ActiveShardPlacementLists(List *taskList) Task *task = (Task *) lfirst(taskCell); uint64 anchorShardId = task->anchorShardId; - List *shardPlacementList = FinalizedShardPlacementList(anchorShardId); + List *shardPlacementList = ActiveShardPlacementList(anchorShardId); /* filter out shard placements that reside in inactive nodes */ List *activeShardPlacementList = ActivePlacementList(shardPlacementList); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 59b368350..fb6441db9 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2464,7 +2464,7 @@ WorkersContainingAllShards(List *prunedShardIntervalsList) uint64 shardId = shardInterval->shardId; /* retrieve all active shard placements for this shard */ - List *newPlacementList = FinalizedShardPlacementList(shardId); + List *newPlacementList = ActiveShardPlacementList(shardId); if (firstShard) { diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 60e7e1803..6e8d0ef8e 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -114,23 +114,23 @@ load_shard_interval_array(PG_FUNCTION_ARGS) * load_shard_placement_array loads a shard interval using the provided ID * and returns an array of strings containing the node name and port for each * placement of the specified shard interval. If the second argument is true, - * only finalized placements are returned; otherwise, all are. If no such shard + * only active placements are returned; otherwise, all are. If no such shard * interval can be found, this function raises an error instead. */ Datum load_shard_placement_array(PG_FUNCTION_ARGS) { int64 shardId = PG_GETARG_INT64(0); - bool onlyFinalized = PG_GETARG_BOOL(1); + bool onlyActive = PG_GETARG_BOOL(1); List *placementList = NIL; ListCell *placementCell = NULL; int placementIndex = 0; Oid placementTypeId = TEXTOID; StringInfo placementInfo = makeStringInfo(); - if (onlyFinalized) + if (onlyActive) { - placementList = FinalizedShardPlacementList(shardId); + placementList = ActiveShardPlacementList(shardId); } else { diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 942852a78..593e6826e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -406,7 +406,7 @@ OutShardPlacement(OUTFUNC_ARGS) WRITE_UINT64_FIELD(placementId); WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardLength); - WRITE_ENUM_FIELD(shardState, RelayFileState); + WRITE_ENUM_FIELD(shardState, ShardState); WRITE_INT_FIELD(groupId); WRITE_STRING_FIELD(nodeName); WRITE_UINT_FIELD(nodePort); @@ -427,7 +427,7 @@ OutGroupShardPlacement(OUTFUNC_ARGS) WRITE_UINT64_FIELD(placementId); WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardLength); - WRITE_ENUM_FIELD(shardState, RelayFileState); + WRITE_ENUM_FIELD(shardState, ShardState); WRITE_INT_FIELD(groupId); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index d5661427b..b560e1f65 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -316,7 +316,7 @@ ReadShardPlacement(READFUNC_ARGS) READ_UINT64_FIELD(placementId); READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardLength); - READ_ENUM_FIELD(shardState, RelayFileState); + READ_ENUM_FIELD(shardState, ShardState); READ_INT_FIELD(groupId); READ_STRING_FIELD(nodeName); READ_UINT_FIELD(nodePort); @@ -338,7 +338,7 @@ ReadGroupShardPlacement(READFUNC_ARGS) READ_UINT64_FIELD(placementId); READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardLength); - READ_ENUM_FIELD(shardState, RelayFileState); + READ_ENUM_FIELD(shardState, ShardState); READ_INT_FIELD(groupId); READ_DONE(); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index d0991d155..8be040379 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -269,7 +269,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) uint64 shardId = shardInterval->shardId; bool missingOk = false; - ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); char *srcNodeName = sourceShardPlacement->nodeName; uint32 srcNodePort = sourceShardPlacement->nodePort; bool includeData = true; @@ -284,13 +284,12 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) char *tableOwner = TableOwner(shardInterval->relationId); /* - * Although this function is used for reference tables and reference table shard - * placements always have shardState = FILE_FINALIZED, in case of an upgrade of - * a non-reference table to reference table, unhealty placements may exist. In - * this case, we repair the shard placement and update its state in - * pg_dist_placement table. + * Although this function is used for reference tables, and reference table shard + * placements always have shardState = SHARD_STATE_ACTIVE, in case of an upgrade + * of a non-reference table to reference table, unhealty placements may exist. + * In this case, repair the shard placement and update its state in pg_dist_placement. */ - if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) + if (targetPlacement == NULL || targetPlacement->shardState != SHARD_STATE_ACTIVE) { uint64 placementId = 0; int32 groupId = 0; @@ -307,13 +306,14 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) groupId = GroupForNode(nodeName, nodePort); placementId = GetNextPlacementId(); - InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, groupId); + InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0, + groupId); } else { groupId = targetPlacement->groupId; placementId = targetPlacement->placementId; - UpdateShardPlacementState(placementId, FILE_FINALIZED); + UpdateShardPlacementState(placementId, SHARD_STATE_ACTIVE); } /* @@ -326,7 +326,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) if (ShouldSyncTableMetadata(shardInterval->relationId)) { char *placementCommand = PlacementUpsertCommand(shardId, placementId, - FILE_FINALIZED, 0, + SHARD_STATE_ACTIVE, 0, groupId); SendCommandToWorkersWithMetadata(placementCommand); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 17e12ca42..01bb2e202 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -60,7 +60,7 @@ typedef struct GroupShardPlacement uint64 placementId; /* sequence that implies this placement creation order */ uint64 shardId; uint64 shardLength; - RelayFileState shardState; + ShardState shardState; int32 groupId; } GroupShardPlacement; @@ -75,7 +75,7 @@ typedef struct ShardPlacement uint64 placementId; uint64 shardId; uint64 shardLength; - RelayFileState shardState; + ShardState shardState; int32 groupId; /* the rest of the fields aren't from pg_dist_placement */ @@ -107,8 +107,8 @@ extern void CopyShardPlacement(ShardPlacement *srcPlacement, extern uint64 ShardLength(uint64 shardId); extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); -extern List * FinalizedShardPlacementList(uint64 shardId); -extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); +extern List * ActiveShardPlacementList(uint64 shardId); +extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); extern List * BuildShardPlacementList(ShardInterval *shardInterval); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 1b4f3c345..30f703039 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -71,7 +71,7 @@ #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s" #define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" -#define FINALIZED_SHARD_PLACEMENTS_QUERY \ +#define ACTIVE_SHARD_PLACEMENTS_QUERY \ "SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = " \ INT64_FORMAT #define UPDATE_SHARD_STATISTICS_QUERY \ diff --git a/src/include/distributed/pg_dist_placement.h b/src/include/distributed/pg_dist_placement.h index 129945aaa..ab98c9d99 100644 --- a/src/include/distributed/pg_dist_placement.h +++ b/src/include/distributed/pg_dist_placement.h @@ -25,7 +25,7 @@ typedef struct FormData_pg_dist_placement { int64 placementid; /* global placementId on remote node */ int64 shardid; /* global shardId on remote node */ - int32 shardstate; /* shard state on remote node; see RelayFileState */ + int32 shardstate; /* shard state on remote node; see ShardState */ int64 shardlength; /* shard length on remote node; stored as bigint */ int32 groupid; /* the group the shard is placed on */ } FormData_pg_dist_placement; diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index 4fa026ac5..82de22e0d 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -26,18 +26,15 @@ #define INVALID_PLACEMENT_ID 0 /* - * RelayFileState represents last known states of shards on a given node. We - * currently only have shards in finalized or cached state; and set this state - * after shards are sucessfully staged or cached. + * ShardState represents last known states of shards on a given node. */ typedef enum { - FILE_INVALID_FIRST = 0, - FILE_FINALIZED = 1, - FILE_CACHED = 2, - FILE_INACTIVE = 3, - FILE_TO_DELETE = 4 -} RelayFileState; + SHARD_STATE_INVALID_FIRST = 0, + SHARD_STATE_ACTIVE = 1, + SHARD_STATE_INACTIVE = 3, + SHARD_STATE_TO_DELETE = 4, +} ShardState; /* Function declarations to extend names in DDL commands */ diff --git a/src/test/regress/expected/multi_colocated_shard_transfer.out b/src/test/regress/expected/multi_colocated_shard_transfer.out index 4dfc43a84..cd15be0c3 100644 --- a/src/test/regress/expected/multi_colocated_shard_transfer.out +++ b/src/test/regress/expected/multi_colocated_shard_transfer.out @@ -199,7 +199,7 @@ ORDER BY s.shardid, sp.nodeport; -- repair while all placements of one shard in colocation group is unhealthy SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); -ERROR: source placement must be in finalized state +ERROR: source placement must be in active state -- status after shard repair SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate FROM diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 1e37ea6d1..584af69d9 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -112,7 +112,7 @@ SELECT load_shard_placement_array(540001, false); {localhost:xxxxx,localhost:xxxxx} (1 row) --- only one of which is finalized +-- only one of which is active SELECT load_shard_placement_array(540001, true); load_shard_placement_array --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_repair_shards.out b/src/test/regress/expected/multi_repair_shards.out index 44ce0818f..710d5a6f1 100644 --- a/src/test/regress/expected/multi_repair_shards.out +++ b/src/test/regress/expected/multi_repair_shards.out @@ -73,7 +73,7 @@ ERROR: target placement must be in inactive state UPDATE pg_dist_placement SET shardstate = 3 WHERE groupid = :worker_2_group and shardid = :newshardid; -- also try to copy from an inactive placement SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -ERROR: source placement must be in finalized state +ERROR: source placement must be in active state -- "copy" this shard from the first placement to the second one SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); master_copy_shard_placement diff --git a/src/test/regress/sql/multi_distribution_metadata.sql b/src/test/regress/sql/multi_distribution_metadata.sql index fbe95b935..c92de854a 100644 --- a/src/test/regress/sql/multi_distribution_metadata.sql +++ b/src/test/regress/sql/multi_distribution_metadata.sql @@ -101,7 +101,7 @@ SELECT load_shard_interval_array(540005, 0); -- should see two placements SELECT load_shard_placement_array(540001, false); --- only one of which is finalized +-- only one of which is active SELECT load_shard_placement_array(540001, true); -- should see error for non-existent shard