diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 588dda350..9118e1f8c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -92,7 +92,7 @@ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdT operation, bool alwaysThrowErrorOnFailure, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); -static List * BuildPlacementAccessList(uint32 groupId, List *relationShardList, +static List * BuildPlacementAccessList(int32 groupId, List *relationShardList, ShardPlacementAccessType accessType); static List * GetModifyConnections(Task *task, bool markCritical); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -890,7 +890,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) * (e.g. in case of a broadcast join) then the shard is skipped. */ List * -BuildPlacementSelectList(uint32 groupId, List *relationShardList) +BuildPlacementSelectList(int32 groupId, List *relationShardList) { return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT); } @@ -900,7 +900,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList) * BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access. */ List * -BuildPlacementDDLList(uint32 groupId, List *relationShardList) +BuildPlacementDDLList(int32 groupId, List *relationShardList) { return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL); } @@ -911,7 +911,7 @@ BuildPlacementDDLList(uint32 groupId, List *relationShardList) * relationShardList and the access type. */ static List * -BuildPlacementAccessList(uint32 groupId, List *relationShardList, +BuildPlacementAccessList(int32 groupId, List *relationShardList, ShardPlacementAccessType accessType) { ListCell *relationShardCell = NULL; diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 78a4a00df..4e3a25305 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -316,7 +316,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool { ShardPlacement *sourcePlacement = (ShardPlacement *) lfirst(sourceShardPlacementCell); - uint32 groupId = sourcePlacement->groupId; + int32 groupId = sourcePlacement->groupId; const RelayFileState shardState = FILE_FINALIZED; const uint64 shardSize = 0; uint64 shardPlacementId = 0; diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 9e1296831..40ff5c007 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -250,7 +250,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ * on the group. */ List * -GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId) +GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId) { DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId); List *resultList = NIL; @@ -634,7 +634,7 @@ ShardLength(uint64 shardId) * NodeGroupHasShardPlacements returns whether any active shards are placed on the group */ bool -NodeGroupHasShardPlacements(uint32 groupId, bool onlyConsiderActivePlacements) +NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements) { const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1); const bool indexOK = false; @@ -649,7 +649,7 @@ NodeGroupHasShardPlacements(uint32 groupId, bool onlyConsiderActivePlacements) AccessShareLock); ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid, - BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(groupId)); + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); if (onlyConsiderActivePlacements) { ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate, @@ -852,7 +852,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) datumArray[Anum_pg_dist_placement_shardlength - 1]); shardPlacement->shardState = DatumGetUInt32( datumArray[Anum_pg_dist_placement_shardstate - 1]); - shardPlacement->groupId = DatumGetUInt32( + shardPlacement->groupId = DatumGetInt32( datumArray[Anum_pg_dist_placement_groupid - 1]); return shardPlacement; @@ -922,7 +922,7 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, - uint32 groupId) + int32 groupId) { Relation pgDistPlacement = NULL; TupleDesc tupleDescriptor = NULL; @@ -942,7 +942,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId); values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState); values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength); - values[Anum_pg_dist_placement_groupid - 1] = Int64GetDatum(groupId); + values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId); /* open shard placement relation and insert new tuple */ pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index fa06c4515..95a69a62a 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -766,7 +766,7 @@ UpdateShardStatistics(int64 shardId) { ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); uint64 placementId = placement->placementId; - uint32 groupId = placement->groupId; + int32 groupId = placement->groupId; DeleteShardPlacementRow(placementId); InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize, diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 418440f09..82d0b8ad0 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -51,7 +51,7 @@ #include "utils/tqual.h" -static char * LocalGroupIdUpdateCommand(uint32 groupId); +static char * LocalGroupIdUpdateCommand(int32 groupId); static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); static List * SequenceDDLCommandsForTable(Oid relationId); static void EnsureSupportedSequenceColumnType(Oid sequenceOid); @@ -824,7 +824,7 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) */ char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, - uint64 shardLength, uint32 groupId) + uint64 shardLength, int32 groupId) { StringInfo command = makeStringInfo(); @@ -840,7 +840,7 @@ PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, * of a worker and returns the command in a string. */ static char * -LocalGroupIdUpdateCommand(uint32 groupId) +LocalGroupIdUpdateCommand(int32 groupId) { StringInfo updateCommand = makeStringInfo(); diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 2a4a6e140..f9028c379 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1323,7 +1323,7 @@ Assign2PCIdentifier(MultiConnection *connection) */ bool ParsePreparedTransactionName(char *preparedTransactionName, - int *groupId, int *procId, + int32 *groupId, int *procId, uint64 *transactionNumber, uint32 *connectionNumber) { diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index fde13608e..81746dcd0 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -78,7 +78,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) * prepared transaction should be committed. */ void -LogTransactionRecord(int groupId, char *transactionName) +LogTransactionRecord(int32 groupId, char *transactionName) { Relation pgDistTransaction = NULL; TupleDesc tupleDescriptor = NULL; @@ -141,7 +141,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) { int recoveredTransactionCount = 0; - int groupId = workerNode->groupId; + int32 groupId = workerNode->groupId; char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; @@ -461,7 +461,7 @@ PendingWorkerTransactionList(MultiConnection *connection) static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName) { - int groupId = 0; + int32 groupId = 0; int procId = 0; uint32 connectionNumber = 0; uint64 transactionNumber = 0; diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 94f6a4d09..ca8f77d67 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -402,7 +402,7 @@ OutShardPlacement(OUTFUNC_ARGS) WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardLength); WRITE_ENUM_FIELD(shardState, RelayFileState); - WRITE_UINT_FIELD(groupId); + WRITE_INT_FIELD(groupId); WRITE_STRING_FIELD(nodeName); WRITE_UINT_FIELD(nodePort); /* so we can deal with 0 */ @@ -422,7 +422,7 @@ OutGroupShardPlacement(OUTFUNC_ARGS) WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardLength); WRITE_ENUM_FIELD(shardState, RelayFileState); - WRITE_UINT_FIELD(groupId); + WRITE_INT_FIELD(groupId); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index bd02f10b9..cbd573142 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -312,7 +312,7 @@ ReadShardPlacement(READFUNC_ARGS) READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardLength); READ_ENUM_FIELD(shardState, RelayFileState); - READ_UINT_FIELD(groupId); + READ_INT_FIELD(groupId); READ_STRING_FIELD(nodeName); READ_UINT_FIELD(nodePort); /* so we can deal with 0 */ @@ -333,7 +333,7 @@ ReadGroupShardPlacement(READFUNC_ARGS) READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardLength); READ_ENUM_FIELD(shardState, RelayFileState); - READ_UINT_FIELD(groupId); + READ_INT_FIELD(groupId); READ_DONE(); } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 2973ec709..9d92e748d 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -160,7 +160,7 @@ static int WorkerNodeCount = 0; static bool workerNodeHashValid = false; /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ -static int LocalGroupId = -1; +static int32 LocalGroupId = -1; /* built first time through in InitializePartitionCache */ static ScanKeyData DistPartitionScanKey[1]; @@ -210,7 +210,7 @@ static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); -static WorkerNode * LookupNodeForGroup(uint32 groupid); +static WorkerNode * LookupNodeForGroup(int32 groupId); static Oid LookupEnumValueId(Oid typeId, char *valueName); static void InvalidateEntireDistCache(void); @@ -474,7 +474,7 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) * on the group. */ ShardPlacement * -FindShardPlacementOnGroup(uint32 groupId, uint64 shardId) +FindShardPlacementOnGroup(int32 groupId, uint64 shardId) { ShardCacheEntry *shardEntry = NULL; DistTableCacheEntry *tableEntry = NULL; @@ -516,7 +516,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); - uint32 groupId = groupShardPlacement->groupId; + int32 groupId = groupShardPlacement->groupId; WorkerNode *workerNode = LookupNodeForGroup(groupId); /* copy everything into shardPlacement but preserve the header */ @@ -583,7 +583,7 @@ LookupNodeByNodeId(uint32 nodeId) * appropriate error message. */ static WorkerNode * -LookupNodeForGroup(uint32 groupId) +LookupNodeForGroup(int32 groupId) { bool foundAnyNodes = false; int workerNodeIndex = 0; @@ -593,7 +593,7 @@ LookupNodeForGroup(uint32 groupId) for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++) { WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex]; - uint32 workerNodeGroupId = workerNode->groupId; + int32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId != groupId) { continue; @@ -609,7 +609,7 @@ LookupNodeForGroup(uint32 groupId) if (!foundAnyNodes) { - ereport(ERROR, (errmsg("there is a shard placement in node group %u but " + ereport(ERROR, (errmsg("there is a shard placement in node group %d but " "there are no nodes in that group", groupId))); } @@ -617,13 +617,13 @@ LookupNodeForGroup(uint32 groupId) { case USE_SECONDARY_NODES_NEVER: { - ereport(ERROR, (errmsg("node group %u does not have a primary node", + ereport(ERROR, (errmsg("node group %d does not have a primary node", groupId))); } case USE_SECONDARY_NODES_ALWAYS: { - ereport(ERROR, (errmsg("node group %u does not have a secondary node", + ereport(ERROR, (errmsg("node group %d does not have a secondary node", groupId))); } @@ -2801,7 +2801,7 @@ RegisterWorkerNodeCacheCallbacks(void) * that pg_dist_local_node_group has exactly one row and has at least one column. * Otherwise, the function errors out. */ -int +int32 GetLocalGroupId(void) { SysScanDesc scanDescriptor = NULL; @@ -2809,7 +2809,7 @@ GetLocalGroupId(void) int scanKeyCount = 0; HeapTuple heapTuple = NULL; TupleDesc tupleDescriptor = NULL; - Oid groupId = InvalidOid; + int32 groupId = 0; Relation pgDistLocalGroupId = NULL; Oid localGroupTableOid = InvalidOid; @@ -2846,7 +2846,7 @@ GetLocalGroupId(void) Anum_pg_dist_local_groupid, tupleDescriptor, &isNull); - groupId = DatumGetUInt32(groupIdDatum); + groupId = DatumGetInt32(groupIdDatum); } else { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 8d41966ff..435bb0f19 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -63,7 +63,7 @@ static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static int GetNextNodeId(void); -static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, +static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, int32 groupId, char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster); static void DeleteNodeRow(char *nodename, int32 nodeport); @@ -395,7 +395,7 @@ WorkerNodeIsReadable(WorkerNode *workerNode) * it will set the bool groupContainsNodes references to true. */ WorkerNode * -PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes) +PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) { WorkerNode *workerNode = NULL; HASH_SEQ_STATUS status; @@ -405,7 +405,7 @@ PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes) while ((workerNode = hash_seq_search(&status)) != NULL) { - uint32 workerNodeGroupId = workerNode->groupId; + int32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId != groupId) { continue; @@ -1116,7 +1116,7 @@ GenerateNodeTuple(WorkerNode *workerNode) memset(isNulls, false, sizeof(isNulls)); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(workerNode->nodeId); - values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(workerNode->groupId); + values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(workerNode->groupId); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(workerNode->workerName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); @@ -1166,7 +1166,7 @@ GetNextGroupId() SetUserIdAndSecContext(savedUserId, savedSecurityContext); - groupId = DatumGetUInt32(groupIdDatum); + groupId = DatumGetInt32(groupIdDatum); return groupId; } @@ -1232,7 +1232,7 @@ EnsureCoordinator(void) * an existing group. If you don't it's possible for the metadata to become inconsistent. */ static void -InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster) { Relation pgDistNode = NULL; @@ -1249,7 +1249,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * memset(isNulls, false, sizeof(isNulls)); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); - values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId); + values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(groupId); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); @@ -1500,7 +1500,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]); workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]); - workerNode->groupId = DatumGetUInt32(datumArray[Anum_pg_dist_node_groupid - 1]); + workerNode->groupId = DatumGetInt32(datumArray[Anum_pg_dist_node_groupid - 1]); strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index f5b1608a2..2661eaf71 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -306,7 +306,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { uint64 placementId = 0; - uint32 groupId = 0; + int32 groupId = 0; ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", get_rel_name(shardInterval->relationId), nodeName, @@ -410,7 +410,7 @@ CreateReferenceTableColocationId() * group of reference tables. It is caller's responsibility to do that if it is necessary. */ void -DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId) +DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId) { List *referenceTableList = ReferenceTableOidList(); List *referenceShardIntervalList = NIL; diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 125f17cb4..3fba2b449 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -79,7 +79,7 @@ typedef struct GroupShardPlacement uint64 shardId; uint64 shardLength; RelayFileState shardState; - uint32 groupId; + int32 groupId; } GroupShardPlacement; @@ -94,7 +94,7 @@ typedef struct ShardPlacement uint64 shardId; uint64 shardLength; RelayFileState shardState; - uint32 groupId; + int32 groupId; /* the rest of the fields aren't from pg_dist_placement */ char *nodeName; @@ -122,13 +122,13 @@ extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInt extern void CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement); extern uint64 ShardLength(uint64 shardId); -extern bool NodeGroupHasShardPlacements(uint32 groupId, +extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); extern List * FinalizedShardPlacementList(uint64 shardId); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern List * BuildShardPlacementList(ShardInterval *shardInterval); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); -extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId); +extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, @@ -136,7 +136,7 @@ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void DeleteShardRow(uint64 shardId); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, - uint32 groupId); + int32 groupId); extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId, char replicationModel); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index bd410116d..6ade86ed8 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -93,11 +93,11 @@ extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern Oid RelationIdForShard(uint64 shardId); extern bool ReferenceTableShardId(uint64 shardId); -extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId); +extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); -extern int GetLocalGroupId(void); +extern int32 GetLocalGroupId(void); extern List * DistTableOidList(void); extern Oid LookupShardRelation(int64 shardId, bool missing_ok); extern List * ShardPlacementList(uint64 shardId); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 05f569a05..98e7b3733 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -35,7 +35,7 @@ extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, - uint64 shardLength, uint32 groupId); + uint64 shardLength, int32 groupId); extern void CreateTableMetadataOnWorkers(Oid relationId); diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 86106c211..003d205f3 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -53,7 +53,7 @@ extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, /* helper functions */ extern bool TaskListRequires2PC(List *taskList); -extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList); -extern List * BuildPlacementDDLList(uint32 groupId, List *relationShardList); +extern List * BuildPlacementSelectList(int32 groupId, List *relationShardList); +extern List * BuildPlacementDDLList(int32 groupId, List *relationShardList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 3e1365844..480e77a1b 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -14,7 +14,7 @@ extern uint32 CreateReferenceTableColocationId(void); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); -extern void DeleteAllReferenceTablePlacementsFromNodeGroup(uint32 groupId); +extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId); extern List * ReferenceTableOidList(void); extern int CompareOids(const void *leftElement, const void *rightElement); diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 92a065e11..021fb80d5 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -81,7 +81,7 @@ typedef struct RemoteTransaction /* utility functions for dealing with remote transactions */ -extern bool ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, +extern bool ParsePreparedTransactionName(char *preparedTransactionName, int32 *groupId, int *procId, uint64 *transactionNumber, uint32 *connectionNumber); diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 9f359fc18..410f24315 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,7 +17,7 @@ extern int Recover2PCInterval; /* Functions declarations for worker transactions */ -extern void LogTransactionRecord(int groupId, char *transactionName); +extern void LogTransactionRecord(int32 groupId, char *transactionName); extern int RecoverTwoPhaseCommits(void); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 888d2d7ef..ffa5210c5 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -41,7 +41,7 @@ typedef struct WorkerNode uint32 nodeId; /* node's unique id, key of the hash table */ uint32 workerPort; /* node's port */ char workerName[WORKER_LENGTH]; /* node's name */ - uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ + int32 groupId; /* node's groupId; same for the nodes that are in the same group */ char workerRack[WORKER_LENGTH]; /* node's network location */ bool hasMetadata; /* node gets metadata changes */ bool isActive; /* node's state */ @@ -72,7 +72,7 @@ extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); extern uint32 GroupForNode(char *nodeName, int32 nodePorT); -extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes); +extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); extern bool WorkerNodeIsPrimary(WorkerNode *worker); extern bool WorkerNodeIsSecondary(WorkerNode *worker); extern bool WorkerNodeIsReadable(WorkerNode *worker);