Merge pull request #3375 from citusdata/rename-relayfilestate

Rename RelayFileState to ShardState
pull/3367/head
Philip Dubé 2020-01-12 06:18:36 +00:00 committed by GitHub
commit 8b4429e2dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 110 additions and 113 deletions

View File

@ -134,7 +134,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
return false; return false;
} }
List *placementList = FinalizedShardPlacementList(shardInterval->shardId); List *placementList = ActiveShardPlacementList(shardInterval->shardId);
if (list_length(placementList) != 1) if (list_length(placementList) != 1)
{ {
/* punt on this for now */ /* punt on this for now */

View File

@ -532,7 +532,7 @@ CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt)
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
@ -577,7 +577,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
@ -907,7 +907,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);

View File

@ -208,7 +208,7 @@ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
static bool BinaryOutputFunctionDefined(Oid typeId); static bool BinaryOutputFunctionDefined(Oid typeId);
static List * MasterShardPlacementList(uint64 shardId); static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId); static List * RemoteActiveShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
List *connectionList); List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
@ -915,14 +915,14 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
/* release finalized placement list at the end of this function */ /* release active placement list at the end of this function */
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
List *finalizedPlacementList = MasterShardPlacementList(shardId); List *activePlacementList = MasterShardPlacementList(shardId);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
foreach(placementCell, finalizedPlacementList) foreach(placementCell, activePlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
@ -981,7 +981,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
} }
/* if all placements failed, error out */ /* if all placements failed, error out */
if (failedPlacementCount == list_length(finalizedPlacementList)) if (failedPlacementCount == list_length(activePlacementList))
{ {
ereport(ERROR, (errmsg("could not connect to any active placements"))); ereport(ERROR, (errmsg("could not connect to any active placements")));
} }
@ -1097,38 +1097,38 @@ BinaryOutputFunctionDefined(Oid typeId)
/* /*
* MasterShardPlacementList dispatches the finalized shard placements call * MasterShardPlacementList dispatches the active shard placements call
* between local or remote master node according to the master connection state. * between local or remote master node according to the master connection state.
*/ */
static List * static List *
MasterShardPlacementList(uint64 shardId) MasterShardPlacementList(uint64 shardId)
{ {
List *finalizedPlacementList = NIL; List *activePlacementList = NIL;
if (masterConnection == NULL) if (masterConnection == NULL)
{ {
finalizedPlacementList = FinalizedShardPlacementList(shardId); activePlacementList = ActiveShardPlacementList(shardId);
} }
else else
{ {
finalizedPlacementList = RemoteFinalizedShardPlacementList(shardId); activePlacementList = RemoteActiveShardPlacementList(shardId);
} }
return finalizedPlacementList; return activePlacementList;
} }
/* /*
* RemoteFinalizedShardPlacementList gets the finalized shard placement list * RemoteActiveShardPlacementList gets the active shard placement list
* for the given shard id from the remote master node. * for the given shard id from the remote master node.
*/ */
static List * static List *
RemoteFinalizedShardPlacementList(uint64 shardId) RemoteActiveShardPlacementList(uint64 shardId)
{ {
List *finalizedPlacementList = NIL; List *activePlacementList = NIL;
bool raiseInterrupts = true; bool raiseInterrupts = true;
StringInfo shardPlacementsCommand = makeStringInfo(); StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); appendStringInfo(shardPlacementsCommand, ACTIVE_SHARD_PLACEMENTS_QUERY, shardId);
if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data)) if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data))
{ {
@ -1161,7 +1161,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
*/ */
shardPlacement->nodeId = -1; shardPlacement->nodeId = -1;
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement); activePlacementList = lappend(activePlacementList, shardPlacement);
} }
} }
else else
@ -1173,7 +1173,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult); Assert(!queryResult);
return finalizedPlacementList; return activePlacementList;
} }
@ -3248,17 +3248,17 @@ InitializeCopyShardState(CopyShardState *shardState,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
/* release finalized placement list at the end of this function */ /* release active placement list at the end of this function */
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
List *finalizedPlacementList = MasterShardPlacementList(shardId); List *activePlacementList = MasterShardPlacementList(shardId);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
shardState->shardId = shardId; shardState->shardId = shardId;
shardState->placementStateList = NIL; shardState->placementStateList = NIL;
foreach(placementCell, finalizedPlacementList) foreach(placementCell, activePlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
@ -3300,7 +3300,7 @@ InitializeCopyShardState(CopyShardState *shardState,
} }
/* if all placements failed, error out */ /* if all placements failed, error out */
if (failedPlacementCount == list_length(finalizedPlacementList)) if (failedPlacementCount == list_length(activePlacementList))
{ {
ereport(ERROR, (errmsg("could not connect to any active placements"))); ereport(ERROR, (errmsg("could not connect to any active placements")));
} }

View File

@ -1323,7 +1323,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = leftShardId; task->anchorShardId = leftShardId;
task->taskPlacementList = FinalizedShardPlacementList(leftShardId); task->taskPlacementList = ActiveShardPlacementList(leftShardId);
task->relationShardList = list_make2(leftRelationShard, rightRelationShard); task->relationShardList = list_make2(leftRelationShard, rightRelationShard);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);

View File

@ -868,7 +868,7 @@ DDLTaskList(Oid relationId, const char *commandString)
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }

View File

@ -229,7 +229,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }

View File

@ -1069,10 +1069,10 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
/* /*
* We only set shard state if its current state is FILE_FINALIZED, which * We only set shard state if it currently is SHARD_STATE_ACTIVE, which
* prevents overwriting shard state if it is already set at somewhere else. * prevents overwriting shard state if it was already set somewhere else.
*/ */
if (shardPlacement->shardState == FILE_FINALIZED) if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{ {
MarkShardPlacementInactive(shardPlacement); MarkShardPlacementInactive(shardPlacement);
} }

View File

@ -3503,10 +3503,10 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
ShardPlacement *shardPlacement = placementExecution->shardPlacement; ShardPlacement *shardPlacement = placementExecution->shardPlacement;
/* /*
* We only set shard state if its current state is FILE_FINALIZED, which * We only set shard state if it currently is SHARD_STATE_ACTIVE, which
* prevents overwriting shard state if it is already set at somewhere else. * prevents overwriting shard state if it was already set somewhere else.
*/ */
if (shardPlacement->shardState == FILE_FINALIZED) if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{ {
MarkShardPlacementInactive(shardPlacement); MarkShardPlacementInactive(shardPlacement);
} }

View File

@ -458,7 +458,7 @@ ColocationTransfers(List *fragmentList, DistTableCacheEntry *targetRelation)
foreach(fragmentCell, fragmentList) foreach(fragmentCell, fragmentList)
{ {
DistributedResultFragment *fragment = lfirst(fragmentCell); DistributedResultFragment *fragment = lfirst(fragmentCell);
List *placementList = FinalizedShardPlacementList(fragment->targetShardId); List *placementList = ActiveShardPlacementList(fragment->targetShardId);
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
foreach(placementCell, placementList) foreach(placementCell, placementList)

View File

@ -412,7 +412,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
LockShardDistributionMetadata(shardId, ShareLock); LockShardDistributionMetadata(shardId, ShareLock);
List *insertShardPlacementList = FinalizedShardPlacementList(shardId); List *insertShardPlacementList = ActiveShardPlacementList(shardId);
RelationShard *relationShard = CitusMakeNode(RelationShard); RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = targetShardInterval->relationId; relationShard->relationId = targetShardInterval->relationId;

View File

@ -305,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
ShardPlacement *sourcePlacement = ShardPlacement *sourcePlacement =
(ShardPlacement *) lfirst(sourceShardPlacementCell); (ShardPlacement *) lfirst(sourceShardPlacementCell);
int32 groupId = sourcePlacement->groupId; int32 groupId = sourcePlacement->groupId;
const RelayFileState shardState = FILE_FINALIZED; const ShardState shardState = SHARD_STATE_ACTIVE;
const uint64 shardSize = 0; const uint64 shardSize = 0;
/* /*

View File

@ -434,7 +434,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
errdetail("Marking this shard placement for " errdetail("Marking this shard placement for "
"deletion"))); "deletion")));
UpdateShardPlacementState(placementId, FILE_TO_DELETE); UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE);
continue; continue;
} }

View File

@ -620,15 +620,15 @@ CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement)
/* /*
* ShardLength finds shard placements for the given shardId, extracts the length * ShardLength finds shard placements for the given shardId, extracts the length
* of a finalized shard, and returns the shard's length. This function errors * of an active shard, and returns the shard's length. This function errors
* out if we cannot find any finalized shard placements for the given shardId. * out if we cannot find any active shard placements for the given shardId.
*/ */
uint64 uint64
ShardLength(uint64 shardId) ShardLength(uint64 shardId)
{ {
uint64 shardLength = 0; uint64 shardLength = 0;
List *shardPlacementList = FinalizedShardPlacementList(shardId); List *shardPlacementList = ActiveShardPlacementList(shardId);
if (shardPlacementList == NIL) if (shardPlacementList == NIL)
{ {
ereport(ERROR, (errmsg("could not find length of shard " UINT64_FORMAT, shardId), ereport(ERROR, (errmsg("could not find length of shard " UINT64_FORMAT, shardId),
@ -664,7 +664,8 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements)
if (onlyConsiderActivePlacements) if (onlyConsiderActivePlacements)
{ {
ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate, ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(FILE_FINALIZED)); BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(SHARD_STATE_ACTIVE));
} }
SysScanDesc scanDescriptor = systable_beginscan(pgPlacement, SysScanDesc scanDescriptor = systable_beginscan(pgPlacement,
@ -673,53 +674,53 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements)
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
bool hasFinalizedPlacements = HeapTupleIsValid(heapTuple); bool hasActivePlacements = HeapTupleIsValid(heapTuple);
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgPlacement, NoLock); heap_close(pgPlacement, NoLock);
return hasFinalizedPlacements; return hasActivePlacements;
} }
/* /*
* FinalizedShardPlacementList finds shard placements for the given shardId from * ActiveShardPlacementList finds shard placements for the given shardId from
* system catalogs, chooses placements that are in finalized state, and returns * system catalogs, chooses placements that are in active state, and returns
* these shard placements in a new list. * these shard placements in a new list.
*/ */
List * List *
FinalizedShardPlacementList(uint64 shardId) ActiveShardPlacementList(uint64 shardId)
{ {
List *finalizedPlacementList = NIL; List *activePlacementList = NIL;
List *shardPlacementList = ShardPlacementList(shardId); List *shardPlacementList = ShardPlacementList(shardId);
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
if (shardPlacement->shardState == FILE_FINALIZED) if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{ {
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement); activePlacementList = lappend(activePlacementList, shardPlacement);
} }
} }
return SortList(finalizedPlacementList, CompareShardPlacementsByWorker); return SortList(activePlacementList, CompareShardPlacementsByWorker);
} }
/* /*
* FinalizedShardPlacement finds a shard placement for the given shardId from * ActiveShardPlacement finds a shard placement for the given shardId from
* system catalog, chooses a placement that is in finalized state and returns * system catalog, chooses a placement that is in active state and returns
* that shard placement. If this function cannot find a healthy shard placement * that shard placement. If this function cannot find a healthy shard placement
* and missingOk is set to false it errors out. * and missingOk is set to false it errors out.
*/ */
ShardPlacement * ShardPlacement *
FinalizedShardPlacement(uint64 shardId, bool missingOk) ActiveShardPlacement(uint64 shardId, bool missingOk)
{ {
List *finalizedPlacementList = FinalizedShardPlacementList(shardId); List *activePlacementList = ActiveShardPlacementList(shardId);
ShardPlacement *shardPlacement = NULL; ShardPlacement *shardPlacement = NULL;
if (list_length(finalizedPlacementList) == 0) if (list_length(activePlacementList) == 0)
{ {
if (!missingOk) if (!missingOk)
{ {
@ -731,7 +732,7 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk)
return shardPlacement; return shardPlacement;
} }
shardPlacement = (ShardPlacement *) linitial(finalizedPlacementList); shardPlacement = (ShardPlacement *) linitial(activePlacementList);
return shardPlacement; return shardPlacement;
} }
@ -1242,14 +1243,13 @@ ShardPlacementOnGroup(uint64 shardId, int groupId)
/* /*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where * MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to invalid (e.g., FILE_INACTIVE). It also marks the partitions * the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the
* of the shard placements as inactive if the shardPlacement belongs to a partitioned * shard placements as inactive if shardPlacement belongs to a partitioned table.
* table.
*/ */
void void
MarkShardPlacementInactive(ShardPlacement *shardPlacement) MarkShardPlacementInactive(ShardPlacement *shardPlacement)
{ {
UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); UpdateShardPlacementState(shardPlacement->placementId, SHARD_STATE_INACTIVE);
/* /*
* In case the shard belongs to a partitioned table, we make sure to update * In case the shard belongs to a partitioned table, we make sure to update
@ -1259,7 +1259,7 @@ MarkShardPlacementInactive(ShardPlacement *shardPlacement)
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (PartitionedTable(shardInterval->relationId)) if (PartitionedTable(shardInterval->relationId))
{ {
UpdatePartitionShardPlacementStates(shardPlacement, FILE_INACTIVE); UpdatePartitionShardPlacementStates(shardPlacement, SHARD_STATE_INACTIVE);
} }
} }

View File

@ -322,7 +322,7 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName, ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName,
targetNodePort, targetNodePort,
missingOk); missingOk);
UpdateShardPlacementState(placement->placementId, FILE_FINALIZED); UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE);
} }
@ -382,17 +382,17 @@ EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePo
sourceNodeName, sourceNodeName,
sourceNodePort, sourceNodePort,
missingSourceOk); missingSourceOk);
if (sourcePlacement->shardState != FILE_FINALIZED) if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("source placement must be in finalized state"))); errmsg("source placement must be in active state")));
} }
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName, targetNodeName,
targetNodePort, targetNodePort,
missingTargetOk); missingTargetOk);
if (targetPlacement->shardState != FILE_INACTIVE) if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("target placement must be in inactive state"))); errmsg("target placement must be in inactive state")));

View File

@ -277,7 +277,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
char *shardQualifiedName = quote_qualified_identifier(shardSchemaName, char *shardQualifiedName = quote_qualified_identifier(shardSchemaName,
shardTableName); shardTableName);
List *shardPlacementList = FinalizedShardPlacementList(shardId); List *shardPlacementList = ActiveShardPlacementList(shardId);
if (shardPlacementList == NIL) if (shardPlacementList == NIL)
{ {
ereport(ERROR, (errmsg("could not find any shard placements for shardId " ereport(ERROR, (errmsg("could not find any shard placements for shardId "
@ -397,7 +397,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
int shardIndex = -1; /* not used in this code path */ int shardIndex = -1; /* not used in this code path */
const RelayFileState shardState = FILE_FINALIZED; const ShardState shardState = SHARD_STATE_ACTIVE;
const uint64 shardSize = 0; const uint64 shardSize = 0;
MultiConnection *connection = MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
@ -453,7 +453,7 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerNodeIndex = (workerStartIndex + attemptNumber) % workerNodeCount; int workerNodeIndex = (workerStartIndex + attemptNumber) % workerNodeCount;
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);
uint32 nodeGroupId = workerNode->groupId; uint32 nodeGroupId = workerNode->groupId;
const RelayFileState shardState = FILE_FINALIZED; const ShardState shardState = SHARD_STATE_ACTIVE;
const uint64 shardSize = 0; const uint64 shardSize = 0;
uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
@ -778,7 +778,7 @@ UpdateShardStatistics(int64 shardId)
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
List *shardPlacementList = FinalizedShardPlacementList(shardId); List *shardPlacementList = ActiveShardPlacementList(shardId);
/* get shard's statistics from a shard placement */ /* get shard's statistics from a shard placement */
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
@ -819,7 +819,7 @@ UpdateShardStatistics(int64 shardId)
int32 groupId = placement->groupId; int32 groupId = placement->groupId;
DeleteShardPlacementRow(placementId); DeleteShardPlacementRow(placementId);
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize, InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, shardSize,
groupId); groupId);
} }

View File

@ -120,7 +120,7 @@ TruncateTaskList(Oid relationId)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }

View File

@ -731,7 +731,7 @@ ShardListInsertCommand(List *shardIntervalList)
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId); List *shardPlacementList = ActiveShardPlacementList(shardId);
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)

View File

@ -327,7 +327,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL; return NULL;
} }
placementList = FinalizedShardPlacementList(shardInterval->shardId); placementList = ActiveShardPlacementList(shardInterval->shardId);
if (list_length(placementList) != 1) if (list_length(placementList) != 1)
{ {
/* punt on this for now */ /* punt on this for now */

View File

@ -535,7 +535,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
} }
/* get the placements for insert target shard and its intersection with select */ /* get the placements for insert target shard and its intersection with select */
List *insertShardPlacementList = FinalizedShardPlacementList(shardId); List *insertShardPlacementList = ActiveShardPlacementList(shardId);
List *intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, List *intersectedPlacementList = IntersectPlacementList(insertShardPlacementList,
selectPlacementList); selectPlacementList);

View File

@ -5285,7 +5285,7 @@ ActiveShardPlacementLists(List *taskList)
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
uint64 anchorShardId = task->anchorShardId; uint64 anchorShardId = task->anchorShardId;
List *shardPlacementList = FinalizedShardPlacementList(anchorShardId); List *shardPlacementList = ActiveShardPlacementList(anchorShardId);
/* filter out shard placements that reside in inactive nodes */ /* filter out shard placements that reside in inactive nodes */
List *activeShardPlacementList = ActivePlacementList(shardPlacementList); List *activeShardPlacementList = ActivePlacementList(shardPlacementList);

View File

@ -2464,7 +2464,7 @@ WorkersContainingAllShards(List *prunedShardIntervalsList)
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
/* retrieve all active shard placements for this shard */ /* retrieve all active shard placements for this shard */
List *newPlacementList = FinalizedShardPlacementList(shardId); List *newPlacementList = ActiveShardPlacementList(shardId);
if (firstShard) if (firstShard)
{ {

View File

@ -114,23 +114,23 @@ load_shard_interval_array(PG_FUNCTION_ARGS)
* load_shard_placement_array loads a shard interval using the provided ID * load_shard_placement_array loads a shard interval using the provided ID
* and returns an array of strings containing the node name and port for each * and returns an array of strings containing the node name and port for each
* placement of the specified shard interval. If the second argument is true, * placement of the specified shard interval. If the second argument is true,
* only finalized placements are returned; otherwise, all are. If no such shard * only active placements are returned; otherwise, all are. If no such shard
* interval can be found, this function raises an error instead. * interval can be found, this function raises an error instead.
*/ */
Datum Datum
load_shard_placement_array(PG_FUNCTION_ARGS) load_shard_placement_array(PG_FUNCTION_ARGS)
{ {
int64 shardId = PG_GETARG_INT64(0); int64 shardId = PG_GETARG_INT64(0);
bool onlyFinalized = PG_GETARG_BOOL(1); bool onlyActive = PG_GETARG_BOOL(1);
List *placementList = NIL; List *placementList = NIL;
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
int placementIndex = 0; int placementIndex = 0;
Oid placementTypeId = TEXTOID; Oid placementTypeId = TEXTOID;
StringInfo placementInfo = makeStringInfo(); StringInfo placementInfo = makeStringInfo();
if (onlyFinalized) if (onlyActive)
{ {
placementList = FinalizedShardPlacementList(shardId); placementList = ActiveShardPlacementList(shardId);
} }
else else
{ {

View File

@ -406,7 +406,7 @@ OutShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(placementId); WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength); WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState); WRITE_ENUM_FIELD(shardState, ShardState);
WRITE_INT_FIELD(groupId); WRITE_INT_FIELD(groupId);
WRITE_STRING_FIELD(nodeName); WRITE_STRING_FIELD(nodeName);
WRITE_UINT_FIELD(nodePort); WRITE_UINT_FIELD(nodePort);
@ -427,7 +427,7 @@ OutGroupShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(placementId); WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength); WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState); WRITE_ENUM_FIELD(shardState, ShardState);
WRITE_INT_FIELD(groupId); WRITE_INT_FIELD(groupId);
} }

View File

@ -316,7 +316,7 @@ ReadShardPlacement(READFUNC_ARGS)
READ_UINT64_FIELD(placementId); READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength); READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState); READ_ENUM_FIELD(shardState, ShardState);
READ_INT_FIELD(groupId); READ_INT_FIELD(groupId);
READ_STRING_FIELD(nodeName); READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort); READ_UINT_FIELD(nodePort);
@ -338,7 +338,7 @@ ReadGroupShardPlacement(READFUNC_ARGS)
READ_UINT64_FIELD(placementId); READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength); READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState); READ_ENUM_FIELD(shardState, ShardState);
READ_INT_FIELD(groupId); READ_INT_FIELD(groupId);
READ_DONE(); READ_DONE();

View File

@ -269,7 +269,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
bool missingOk = false; bool missingOk = false;
ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
char *srcNodeName = sourceShardPlacement->nodeName; char *srcNodeName = sourceShardPlacement->nodeName;
uint32 srcNodePort = sourceShardPlacement->nodePort; uint32 srcNodePort = sourceShardPlacement->nodePort;
bool includeData = true; bool includeData = true;
@ -284,13 +284,12 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
/* /*
* Although this function is used for reference tables and reference table shard * Although this function is used for reference tables, and reference table shard
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of * placements always have shardState = SHARD_STATE_ACTIVE, in case of an upgrade
* a non-reference table to reference table, unhealty placements may exist. In * of a non-reference table to reference table, unhealty placements may exist.
* this case, we repair the shard placement and update its state in * In this case, repair the shard placement and update its state in pg_dist_placement.
* pg_dist_placement table.
*/ */
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) if (targetPlacement == NULL || targetPlacement->shardState != SHARD_STATE_ACTIVE)
{ {
uint64 placementId = 0; uint64 placementId = 0;
int32 groupId = 0; int32 groupId = 0;
@ -307,13 +306,14 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
groupId = GroupForNode(nodeName, nodePort); groupId = GroupForNode(nodeName, nodePort);
placementId = GetNextPlacementId(); placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, groupId); InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0,
groupId);
} }
else else
{ {
groupId = targetPlacement->groupId; groupId = targetPlacement->groupId;
placementId = targetPlacement->placementId; placementId = targetPlacement->placementId;
UpdateShardPlacementState(placementId, FILE_FINALIZED); UpdateShardPlacementState(placementId, SHARD_STATE_ACTIVE);
} }
/* /*
@ -326,7 +326,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
if (ShouldSyncTableMetadata(shardInterval->relationId)) if (ShouldSyncTableMetadata(shardInterval->relationId))
{ {
char *placementCommand = PlacementUpsertCommand(shardId, placementId, char *placementCommand = PlacementUpsertCommand(shardId, placementId,
FILE_FINALIZED, 0, SHARD_STATE_ACTIVE, 0,
groupId); groupId);
SendCommandToWorkersWithMetadata(placementCommand); SendCommandToWorkersWithMetadata(placementCommand);

View File

@ -60,7 +60,7 @@ typedef struct GroupShardPlacement
uint64 placementId; /* sequence that implies this placement creation order */ uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId; uint64 shardId;
uint64 shardLength; uint64 shardLength;
RelayFileState shardState; ShardState shardState;
int32 groupId; int32 groupId;
} GroupShardPlacement; } GroupShardPlacement;
@ -75,7 +75,7 @@ typedef struct ShardPlacement
uint64 placementId; uint64 placementId;
uint64 shardId; uint64 shardId;
uint64 shardLength; uint64 shardLength;
RelayFileState shardState; ShardState shardState;
int32 groupId; int32 groupId;
/* the rest of the fields aren't from pg_dist_placement */ /* the rest of the fields aren't from pg_dist_placement */
@ -107,8 +107,8 @@ extern void CopyShardPlacement(ShardPlacement *srcPlacement,
extern uint64 ShardLength(uint64 shardId); extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId, extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements); bool onlyConsiderActivePlacements);
extern List * FinalizedShardPlacementList(uint64 shardId); extern List * ActiveShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval); extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);

View File

@ -71,7 +71,7 @@
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE"
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s"
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" #define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
#define FINALIZED_SHARD_PLACEMENTS_QUERY \ #define ACTIVE_SHARD_PLACEMENTS_QUERY \
"SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = " \ "SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = " \
INT64_FORMAT INT64_FORMAT
#define UPDATE_SHARD_STATISTICS_QUERY \ #define UPDATE_SHARD_STATISTICS_QUERY \

View File

@ -25,7 +25,7 @@ typedef struct FormData_pg_dist_placement
{ {
int64 placementid; /* global placementId on remote node */ int64 placementid; /* global placementId on remote node */
int64 shardid; /* global shardId on remote node */ int64 shardid; /* global shardId on remote node */
int32 shardstate; /* shard state on remote node; see RelayFileState */ int32 shardstate; /* shard state on remote node; see ShardState */
int64 shardlength; /* shard length on remote node; stored as bigint */ int64 shardlength; /* shard length on remote node; stored as bigint */
int32 groupid; /* the group the shard is placed on */ int32 groupid; /* the group the shard is placed on */
} FormData_pg_dist_placement; } FormData_pg_dist_placement;

View File

@ -26,18 +26,15 @@
#define INVALID_PLACEMENT_ID 0 #define INVALID_PLACEMENT_ID 0
/* /*
* RelayFileState represents last known states of shards on a given node. We * ShardState represents last known states of shards on a given node.
* currently only have shards in finalized or cached state; and set this state
* after shards are sucessfully staged or cached.
*/ */
typedef enum typedef enum
{ {
FILE_INVALID_FIRST = 0, SHARD_STATE_INVALID_FIRST = 0,
FILE_FINALIZED = 1, SHARD_STATE_ACTIVE = 1,
FILE_CACHED = 2, SHARD_STATE_INACTIVE = 3,
FILE_INACTIVE = 3, SHARD_STATE_TO_DELETE = 4,
FILE_TO_DELETE = 4 } ShardState;
} RelayFileState;
/* Function declarations to extend names in DDL commands */ /* Function declarations to extend names in DDL commands */

View File

@ -199,7 +199,7 @@ ORDER BY s.shardid, sp.nodeport;
-- repair while all placements of one shard in colocation group is unhealthy -- repair while all placements of one shard in colocation group is unhealthy
SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: source placement must be in finalized state ERROR: source placement must be in active state
-- status after shard repair -- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM FROM

View File

@ -112,7 +112,7 @@ SELECT load_shard_placement_array(540001, false);
{localhost:xxxxx,localhost:xxxxx} {localhost:xxxxx,localhost:xxxxx}
(1 row) (1 row)
-- only one of which is finalized -- only one of which is active
SELECT load_shard_placement_array(540001, true); SELECT load_shard_placement_array(540001, true);
load_shard_placement_array load_shard_placement_array
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -73,7 +73,7 @@ ERROR: target placement must be in inactive state
UPDATE pg_dist_placement SET shardstate = 3 WHERE groupid = :worker_2_group and shardid = :newshardid; UPDATE pg_dist_placement SET shardstate = 3 WHERE groupid = :worker_2_group and shardid = :newshardid;
-- also try to copy from an inactive placement -- also try to copy from an inactive placement
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port); SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: source placement must be in finalized state ERROR: source placement must be in active state
-- "copy" this shard from the first placement to the second one -- "copy" this shard from the first placement to the second one
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement master_copy_shard_placement

View File

@ -101,7 +101,7 @@ SELECT load_shard_interval_array(540005, 0);
-- should see two placements -- should see two placements
SELECT load_shard_placement_array(540001, false); SELECT load_shard_placement_array(540001, false);
-- only one of which is finalized -- only one of which is active
SELECT load_shard_placement_array(540001, true); SELECT load_shard_placement_array(540001, true);
-- should see error for non-existent shard -- should see error for non-existent shard