Fix some of the casts for groupId (#2609)

A small change which partially addresses #2608.
pull/2621/head
Hadi Moshayedi 2019-03-05 12:06:44 -08:00 committed by GitHub
parent 900ffa76f5
commit f4d3b94e22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 60 additions and 60 deletions

View File

@ -92,7 +92,7 @@ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdT
operation, bool alwaysThrowErrorOnFailure, bool operation, bool alwaysThrowErrorOnFailure, bool
expectResults); expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * BuildPlacementAccessList(uint32 groupId, List *relationShardList, static List * BuildPlacementAccessList(int32 groupId, List *relationShardList,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
static List * GetModifyConnections(Task *task, bool markCritical); static List * GetModifyConnections(Task *task, bool markCritical);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
@ -890,7 +890,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
* (e.g. in case of a broadcast join) then the shard is skipped. * (e.g. in case of a broadcast join) then the shard is skipped.
*/ */
List * List *
BuildPlacementSelectList(uint32 groupId, List *relationShardList) BuildPlacementSelectList(int32 groupId, List *relationShardList)
{ {
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT); return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT);
} }
@ -900,7 +900,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList)
* BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access. * BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access.
*/ */
List * List *
BuildPlacementDDLList(uint32 groupId, List *relationShardList) BuildPlacementDDLList(int32 groupId, List *relationShardList)
{ {
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL); return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL);
} }
@ -911,7 +911,7 @@ BuildPlacementDDLList(uint32 groupId, List *relationShardList)
* relationShardList and the access type. * relationShardList and the access type.
*/ */
static List * static List *
BuildPlacementAccessList(uint32 groupId, List *relationShardList, BuildPlacementAccessList(int32 groupId, List *relationShardList,
ShardPlacementAccessType accessType) ShardPlacementAccessType accessType)
{ {
ListCell *relationShardCell = NULL; ListCell *relationShardCell = NULL;

View File

@ -316,7 +316,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
{ {
ShardPlacement *sourcePlacement = ShardPlacement *sourcePlacement =
(ShardPlacement *) lfirst(sourceShardPlacementCell); (ShardPlacement *) lfirst(sourceShardPlacementCell);
uint32 groupId = sourcePlacement->groupId; int32 groupId = sourcePlacement->groupId;
const RelayFileState shardState = FILE_FINALIZED; const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0; const uint64 shardSize = 0;
uint64 shardPlacementId = 0; uint64 shardPlacementId = 0;

View File

@ -250,7 +250,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
* on the group. * on the group.
*/ */
List * List *
GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId) GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId)
{ {
DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId); DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId);
List *resultList = NIL; List *resultList = NIL;
@ -634,7 +634,7 @@ ShardLength(uint64 shardId)
* NodeGroupHasShardPlacements returns whether any active shards are placed on the group * NodeGroupHasShardPlacements returns whether any active shards are placed on the group
*/ */
bool bool
NodeGroupHasShardPlacements(uint32 groupId, bool onlyConsiderActivePlacements) NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements)
{ {
const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1); const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1);
const bool indexOK = false; const bool indexOK = false;
@ -649,7 +649,7 @@ NodeGroupHasShardPlacements(uint32 groupId, bool onlyConsiderActivePlacements)
AccessShareLock); AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid, ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(groupId)); BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
if (onlyConsiderActivePlacements) if (onlyConsiderActivePlacements)
{ {
ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate, ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate,
@ -852,7 +852,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
datumArray[Anum_pg_dist_placement_shardlength - 1]); datumArray[Anum_pg_dist_placement_shardlength - 1]);
shardPlacement->shardState = DatumGetUInt32( shardPlacement->shardState = DatumGetUInt32(
datumArray[Anum_pg_dist_placement_shardstate - 1]); datumArray[Anum_pg_dist_placement_shardstate - 1]);
shardPlacement->groupId = DatumGetUInt32( shardPlacement->groupId = DatumGetInt32(
datumArray[Anum_pg_dist_placement_groupid - 1]); datumArray[Anum_pg_dist_placement_groupid - 1]);
return shardPlacement; return shardPlacement;
@ -922,7 +922,7 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
uint64 uint64
InsertShardPlacementRow(uint64 shardId, uint64 placementId, InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,
uint32 groupId) int32 groupId)
{ {
Relation pgDistPlacement = NULL; Relation pgDistPlacement = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
@ -942,7 +942,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId); values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId);
values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState); values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState);
values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength); values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_placement_groupid - 1] = Int64GetDatum(groupId); values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);
/* open shard placement relation and insert new tuple */ /* open shard placement relation and insert new tuple */
pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock); pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock);

View File

@ -766,7 +766,7 @@ UpdateShardStatistics(int64 shardId)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 placementId = placement->placementId; uint64 placementId = placement->placementId;
uint32 groupId = placement->groupId; int32 groupId = placement->groupId;
DeleteShardPlacementRow(placementId); DeleteShardPlacementRow(placementId);
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize, InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize,

View File

@ -51,7 +51,7 @@
#include "utils/tqual.h" #include "utils/tqual.h"
static char * LocalGroupIdUpdateCommand(uint32 groupId); static char * LocalGroupIdUpdateCommand(int32 groupId);
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
static List * SequenceDDLCommandsForTable(Oid relationId); static List * SequenceDDLCommandsForTable(Oid relationId);
static void EnsureSupportedSequenceColumnType(Oid sequenceOid); static void EnsureSupportedSequenceColumnType(Oid sequenceOid);
@ -824,7 +824,7 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
*/ */
char * char *
PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, uint32 groupId) uint64 shardLength, int32 groupId)
{ {
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
@ -840,7 +840,7 @@ PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
* of a worker and returns the command in a string. * of a worker and returns the command in a string.
*/ */
static char * static char *
LocalGroupIdUpdateCommand(uint32 groupId) LocalGroupIdUpdateCommand(int32 groupId)
{ {
StringInfo updateCommand = makeStringInfo(); StringInfo updateCommand = makeStringInfo();

View File

@ -1323,7 +1323,7 @@ Assign2PCIdentifier(MultiConnection *connection)
*/ */
bool bool
ParsePreparedTransactionName(char *preparedTransactionName, ParsePreparedTransactionName(char *preparedTransactionName,
int *groupId, int *procId, int32 *groupId, int *procId,
uint64 *transactionNumber, uint64 *transactionNumber,
uint32 *connectionNumber) uint32 *connectionNumber)
{ {

View File

@ -78,7 +78,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
* prepared transaction should be committed. * prepared transaction should be committed.
*/ */
void void
LogTransactionRecord(int groupId, char *transactionName) LogTransactionRecord(int32 groupId, char *transactionName)
{ {
Relation pgDistTransaction = NULL; Relation pgDistTransaction = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
@ -141,7 +141,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
{ {
int recoveredTransactionCount = 0; int recoveredTransactionCount = 0;
int groupId = workerNode->groupId; int32 groupId = workerNode->groupId;
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
@ -461,7 +461,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
static bool static bool
IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName) IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName)
{ {
int groupId = 0; int32 groupId = 0;
int procId = 0; int procId = 0;
uint32 connectionNumber = 0; uint32 connectionNumber = 0;
uint64 transactionNumber = 0; uint64 transactionNumber = 0;

View File

@ -402,7 +402,7 @@ OutShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength); WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState); WRITE_ENUM_FIELD(shardState, RelayFileState);
WRITE_UINT_FIELD(groupId); WRITE_INT_FIELD(groupId);
WRITE_STRING_FIELD(nodeName); WRITE_STRING_FIELD(nodeName);
WRITE_UINT_FIELD(nodePort); WRITE_UINT_FIELD(nodePort);
/* so we can deal with 0 */ /* so we can deal with 0 */
@ -422,7 +422,7 @@ OutGroupShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength); WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState); WRITE_ENUM_FIELD(shardState, RelayFileState);
WRITE_UINT_FIELD(groupId); WRITE_INT_FIELD(groupId);
} }

View File

@ -312,7 +312,7 @@ ReadShardPlacement(READFUNC_ARGS)
READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength); READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState); READ_ENUM_FIELD(shardState, RelayFileState);
READ_UINT_FIELD(groupId); READ_INT_FIELD(groupId);
READ_STRING_FIELD(nodeName); READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort); READ_UINT_FIELD(nodePort);
/* so we can deal with 0 */ /* so we can deal with 0 */
@ -333,7 +333,7 @@ ReadGroupShardPlacement(READFUNC_ARGS)
READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength); READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState); READ_ENUM_FIELD(shardState, RelayFileState);
READ_UINT_FIELD(groupId); READ_INT_FIELD(groupId);
READ_DONE(); READ_DONE();
} }

View File

@ -160,7 +160,7 @@ static int WorkerNodeCount = 0;
static bool workerNodeHashValid = false; static bool workerNodeHashValid = false;
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
static int LocalGroupId = -1; static int32 LocalGroupId = -1;
/* built first time through in InitializePartitionCache */ /* built first time through in InitializePartitionCache */
static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistPartitionScanKey[1];
@ -210,7 +210,7 @@ static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
static ShardPlacement * ResolveGroupShardPlacement( static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry);
static WorkerNode * LookupNodeForGroup(uint32 groupid); static WorkerNode * LookupNodeForGroup(int32 groupId);
static Oid LookupEnumValueId(Oid typeId, char *valueName); static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateEntireDistCache(void); static void InvalidateEntireDistCache(void);
@ -474,7 +474,7 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
* on the group. * on the group.
*/ */
ShardPlacement * ShardPlacement *
FindShardPlacementOnGroup(uint32 groupId, uint64 shardId) FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
{ {
ShardCacheEntry *shardEntry = NULL; ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL; DistTableCacheEntry *tableEntry = NULL;
@ -516,7 +516,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
uint32 groupId = groupShardPlacement->groupId; int32 groupId = groupShardPlacement->groupId;
WorkerNode *workerNode = LookupNodeForGroup(groupId); WorkerNode *workerNode = LookupNodeForGroup(groupId);
/* copy everything into shardPlacement but preserve the header */ /* copy everything into shardPlacement but preserve the header */
@ -583,7 +583,7 @@ LookupNodeByNodeId(uint32 nodeId)
* appropriate error message. * appropriate error message.
*/ */
static WorkerNode * static WorkerNode *
LookupNodeForGroup(uint32 groupId) LookupNodeForGroup(int32 groupId)
{ {
bool foundAnyNodes = false; bool foundAnyNodes = false;
int workerNodeIndex = 0; int workerNodeIndex = 0;
@ -593,7 +593,7 @@ LookupNodeForGroup(uint32 groupId)
for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++) for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++)
{ {
WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex]; WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex];
uint32 workerNodeGroupId = workerNode->groupId; int32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId != groupId) if (workerNodeGroupId != groupId)
{ {
continue; continue;
@ -609,7 +609,7 @@ LookupNodeForGroup(uint32 groupId)
if (!foundAnyNodes) if (!foundAnyNodes)
{ {
ereport(ERROR, (errmsg("there is a shard placement in node group %u but " ereport(ERROR, (errmsg("there is a shard placement in node group %d but "
"there are no nodes in that group", groupId))); "there are no nodes in that group", groupId)));
} }
@ -617,13 +617,13 @@ LookupNodeForGroup(uint32 groupId)
{ {
case USE_SECONDARY_NODES_NEVER: case USE_SECONDARY_NODES_NEVER:
{ {
ereport(ERROR, (errmsg("node group %u does not have a primary node", ereport(ERROR, (errmsg("node group %d does not have a primary node",
groupId))); groupId)));
} }
case USE_SECONDARY_NODES_ALWAYS: case USE_SECONDARY_NODES_ALWAYS:
{ {
ereport(ERROR, (errmsg("node group %u does not have a secondary node", ereport(ERROR, (errmsg("node group %d does not have a secondary node",
groupId))); groupId)));
} }
@ -2801,7 +2801,7 @@ RegisterWorkerNodeCacheCallbacks(void)
* that pg_dist_local_node_group has exactly one row and has at least one column. * that pg_dist_local_node_group has exactly one row and has at least one column.
* Otherwise, the function errors out. * Otherwise, the function errors out.
*/ */
int int32
GetLocalGroupId(void) GetLocalGroupId(void)
{ {
SysScanDesc scanDescriptor = NULL; SysScanDesc scanDescriptor = NULL;
@ -2809,7 +2809,7 @@ GetLocalGroupId(void)
int scanKeyCount = 0; int scanKeyCount = 0;
HeapTuple heapTuple = NULL; HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Oid groupId = InvalidOid; int32 groupId = 0;
Relation pgDistLocalGroupId = NULL; Relation pgDistLocalGroupId = NULL;
Oid localGroupTableOid = InvalidOid; Oid localGroupTableOid = InvalidOid;
@ -2846,7 +2846,7 @@ GetLocalGroupId(void)
Anum_pg_dist_local_groupid, Anum_pg_dist_local_groupid,
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
groupId = DatumGetUInt32(groupIdDatum); groupId = DatumGetInt32(groupIdDatum);
} }
else else
{ {

View File

@ -63,7 +63,7 @@ static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
static Datum GenerateNodeTuple(WorkerNode *workerNode); static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void); static int32 GetNextGroupId(void);
static int GetNextNodeId(void); static int GetNextNodeId(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, int32 groupId,
char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole, char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole,
char *nodeCluster); char *nodeCluster);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
@ -395,7 +395,7 @@ WorkerNodeIsReadable(WorkerNode *workerNode)
* it will set the bool groupContainsNodes references to true. * it will set the bool groupContainsNodes references to true.
*/ */
WorkerNode * WorkerNode *
PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes) PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
{ {
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
@ -405,7 +405,7 @@ PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes)
while ((workerNode = hash_seq_search(&status)) != NULL) while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
uint32 workerNodeGroupId = workerNode->groupId; int32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId != groupId) if (workerNodeGroupId != groupId)
{ {
continue; continue;
@ -1116,7 +1116,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(workerNode->nodeId); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(workerNode->nodeId);
values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(workerNode->groupId); values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(workerNode->groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
@ -1166,7 +1166,7 @@ GetNextGroupId()
SetUserIdAndSecContext(savedUserId, savedSecurityContext); SetUserIdAndSecContext(savedUserId, savedSecurityContext);
groupId = DatumGetUInt32(groupIdDatum); groupId = DatumGetInt32(groupIdDatum);
return groupId; return groupId;
} }
@ -1232,7 +1232,7 @@ EnsureCoordinator(void)
* an existing group. If you don't it's possible for the metadata to become inconsistent. * an existing group. If you don't it's possible for the metadata to become inconsistent.
*/ */
static void static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster) bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster)
{ {
Relation pgDistNode = NULL; Relation pgDistNode = NULL;
@ -1249,7 +1249,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId); values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
@ -1500,7 +1500,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]); workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]);
workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]); workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]);
workerNode->groupId = DatumGetUInt32(datumArray[Anum_pg_dist_node_groupid - 1]); workerNode->groupId = DatumGetInt32(datumArray[Anum_pg_dist_node_groupid - 1]);
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]); workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);

View File

@ -306,7 +306,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{ {
uint64 placementId = 0; uint64 placementId = 0;
uint32 groupId = 0; int32 groupId = 0;
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
get_rel_name(shardInterval->relationId), nodeName, get_rel_name(shardInterval->relationId), nodeName,
@ -410,7 +410,7 @@ CreateReferenceTableColocationId()
* group of reference tables. It is caller's responsibility to do that if it is necessary. * group of reference tables. It is caller's responsibility to do that if it is necessary.
*/ */
void void
DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId) DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)
{ {
List *referenceTableList = ReferenceTableOidList(); List *referenceTableList = ReferenceTableOidList();
List *referenceShardIntervalList = NIL; List *referenceShardIntervalList = NIL;

View File

@ -79,7 +79,7 @@ typedef struct GroupShardPlacement
uint64 shardId; uint64 shardId;
uint64 shardLength; uint64 shardLength;
RelayFileState shardState; RelayFileState shardState;
uint32 groupId; int32 groupId;
} GroupShardPlacement; } GroupShardPlacement;
@ -94,7 +94,7 @@ typedef struct ShardPlacement
uint64 shardId; uint64 shardId;
uint64 shardLength; uint64 shardLength;
RelayFileState shardState; RelayFileState shardState;
uint32 groupId; int32 groupId;
/* the rest of the fields aren't from pg_dist_placement */ /* the rest of the fields aren't from pg_dist_placement */
char *nodeName; char *nodeName;
@ -122,13 +122,13 @@ extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInt
extern void CopyShardPlacement(ShardPlacement *srcPlacement, extern void CopyShardPlacement(ShardPlacement *srcPlacement,
ShardPlacement *destPlacement); ShardPlacement *destPlacement);
extern uint64 ShardLength(uint64 shardId); extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(uint32 groupId, extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements); bool onlyConsiderActivePlacements);
extern List * FinalizedShardPlacementList(uint64 shardId); extern List * FinalizedShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval); extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
@ -136,7 +136,7 @@ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,
uint32 groupId); int32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId, Var *distributionColumn, uint32 colocationId,
char replicationModel); char replicationModel);

View File

@ -93,11 +93,11 @@ extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern Oid RelationIdForShard(uint64 shardId); extern Oid RelationIdForShard(uint64 shardId);
extern bool ReferenceTableShardId(uint64 shardId); extern bool ReferenceTableShardId(uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId); extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void); extern int32 GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);
extern Oid LookupShardRelation(int64 shardId, bool missing_ok); extern Oid LookupShardRelation(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId); extern List * ShardPlacementList(uint64 shardId);

View File

@ -35,7 +35,7 @@ extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, uint32 groupId); uint64 shardLength, int32 groupId);
extern void CreateTableMetadataOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId);

View File

@ -53,7 +53,7 @@ extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
/* helper functions */ /* helper functions */
extern bool TaskListRequires2PC(List *taskList); extern bool TaskListRequires2PC(List *taskList);
extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList); extern List * BuildPlacementSelectList(int32 groupId, List *relationShardList);
extern List * BuildPlacementDDLList(uint32 groupId, List *relationShardList); extern List * BuildPlacementDDLList(int32 groupId, List *relationShardList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */ #endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -14,7 +14,7 @@
extern uint32 CreateReferenceTableColocationId(void); extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
extern List * ReferenceTableOidList(void); extern List * ReferenceTableOidList(void);
extern int CompareOids(const void *leftElement, const void *rightElement); extern int CompareOids(const void *leftElement, const void *rightElement);

View File

@ -81,7 +81,7 @@ typedef struct RemoteTransaction
/* utility functions for dealing with remote transactions */ /* utility functions for dealing with remote transactions */
extern bool ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, extern bool ParsePreparedTransactionName(char *preparedTransactionName, int32 *groupId,
int *procId, uint64 *transactionNumber, int *procId, uint64 *transactionNumber,
uint32 *connectionNumber); uint32 *connectionNumber);

View File

@ -17,7 +17,7 @@ extern int Recover2PCInterval;
/* Functions declarations for worker transactions */ /* Functions declarations for worker transactions */
extern void LogTransactionRecord(int groupId, char *transactionName); extern void LogTransactionRecord(int32 groupId, char *transactionName);
extern int RecoverTwoPhaseCommits(void); extern int RecoverTwoPhaseCommits(void);

View File

@ -41,7 +41,7 @@ typedef struct WorkerNode
uint32 nodeId; /* node's unique id, key of the hash table */ uint32 nodeId; /* node's unique id, key of the hash table */
uint32 workerPort; /* node's port */ uint32 workerPort; /* node's port */
char workerName[WORKER_LENGTH]; /* node's name */ char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ int32 groupId; /* node's groupId; same for the nodes that are in the same group */
char workerRack[WORKER_LENGTH]; /* node's network location */ char workerRack[WORKER_LENGTH]; /* node's network location */
bool hasMetadata; /* node gets metadata changes */ bool hasMetadata; /* node gets metadata changes */
bool isActive; /* node's state */ bool isActive; /* node's state */
@ -72,7 +72,7 @@ extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters); extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void); extern void EnsureCoordinator(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePorT); extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
extern bool WorkerNodeIsPrimary(WorkerNode *worker); extern bool WorkerNodeIsPrimary(WorkerNode *worker);
extern bool WorkerNodeIsSecondary(WorkerNode *worker); extern bool WorkerNodeIsSecondary(WorkerNode *worker);
extern bool WorkerNodeIsReadable(WorkerNode *worker); extern bool WorkerNodeIsReadable(WorkerNode *worker);