mirror of https://github.com/citusdata/citus.git
alternative example of foreach with index tracking
parent
19f28eabae
commit
37604c41c1
|
@ -220,14 +220,15 @@ AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt)
|
||||||
appendStringInfoString(buf, " (");
|
appendStringInfoString(buf, " (");
|
||||||
|
|
||||||
Value *statType = NULL;
|
Value *statType = NULL;
|
||||||
foreach_ptr(statType, stmt->stat_types)
|
int statTypeIndex = 0;
|
||||||
|
foreach_ptr_with_index(statType, stmt->stat_types, statTypeIndex)
|
||||||
{
|
{
|
||||||
appendStringInfoString(buf, strVal(statType));
|
if (statTypeIndex != 0)
|
||||||
|
|
||||||
if (statType != llast(stmt->stat_types))
|
|
||||||
{
|
{
|
||||||
appendStringInfoString(buf, ", ");
|
appendStringInfoString(buf, ", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(buf, strVal(statType));
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfoString(buf, ")");
|
appendStringInfoString(buf, ")");
|
||||||
|
@ -240,7 +241,8 @@ AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
|
||||||
{
|
{
|
||||||
StatsElem *column = NULL;
|
StatsElem *column = NULL;
|
||||||
|
|
||||||
foreach_ptr(column, stmt->exprs)
|
int columnIndex = 0;
|
||||||
|
foreach_ptr_with_index(column, stmt->exprs, columnIndex)
|
||||||
{
|
{
|
||||||
if (!column->name)
|
if (!column->name)
|
||||||
{
|
{
|
||||||
|
@ -250,14 +252,13 @@ AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
|
||||||
"only simple column references are allowed in CREATE STATISTICS")));
|
"only simple column references are allowed in CREATE STATISTICS")));
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *columnName = quote_identifier(column->name);
|
if (columnIndex != 0)
|
||||||
|
|
||||||
appendStringInfoString(buf, columnName);
|
|
||||||
|
|
||||||
if (column != llast(stmt->exprs))
|
|
||||||
{
|
{
|
||||||
appendStringInfoString(buf, ", ");
|
appendStringInfoString(buf, ", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char *columnName = quote_identifier(column->name);
|
||||||
|
appendStringInfoString(buf, columnName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -637,26 +637,24 @@ QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer)
|
||||||
{
|
{
|
||||||
StringInfo queryString = makeStringInfo();
|
StringInfo queryString = makeStringInfo();
|
||||||
StringInfo fragmentNamesArrayString = makeStringInfo();
|
StringInfo fragmentNamesArrayString = makeStringInfo();
|
||||||
int fragmentCount = 0;
|
|
||||||
NodePair *nodePair = &fragmentsTransfer->nodes;
|
NodePair *nodePair = &fragmentsTransfer->nodes;
|
||||||
WorkerNode *sourceNode = LookupNodeByNodeIdOrError(nodePair->sourceNodeId);
|
WorkerNode *sourceNode = LookupNodeByNodeIdOrError(nodePair->sourceNodeId);
|
||||||
|
|
||||||
appendStringInfoString(fragmentNamesArrayString, "ARRAY[");
|
appendStringInfoString(fragmentNamesArrayString, "ARRAY[");
|
||||||
|
|
||||||
DistributedResultFragment *fragment = NULL;
|
DistributedResultFragment *fragment = NULL;
|
||||||
foreach_ptr(fragment, fragmentsTransfer->fragmentList)
|
int fragmentIndex = 0;
|
||||||
|
foreach_ptr_with_index(fragment, fragmentsTransfer->fragmentList, fragmentIndex)
|
||||||
{
|
{
|
||||||
const char *fragmentName = fragment->resultId;
|
const char *fragmentName = fragment->resultId;
|
||||||
|
|
||||||
if (fragmentCount > 0)
|
if (fragmentIndex != 0)
|
||||||
{
|
{
|
||||||
appendStringInfoString(fragmentNamesArrayString, ",");
|
appendStringInfoString(fragmentNamesArrayString, ",");
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfoString(fragmentNamesArrayString,
|
appendStringInfoString(fragmentNamesArrayString,
|
||||||
quote_literal_cstr(fragmentName));
|
quote_literal_cstr(fragmentName));
|
||||||
|
|
||||||
fragmentCount++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfoString(fragmentNamesArrayString, "]::text[]");
|
appendStringInfoString(fragmentNamesArrayString, "]::text[]");
|
||||||
|
|
|
@ -580,20 +580,18 @@ static int
|
||||||
PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
|
PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
|
||||||
{
|
{
|
||||||
Var *partitionColumn = PartitionColumn(relationId, 0);
|
Var *partitionColumn = PartitionColumn(relationId, 0);
|
||||||
int partitionColumnIndex = 0;
|
|
||||||
|
|
||||||
const char *columnName = NULL;
|
const char *columnName = NULL;
|
||||||
foreach_ptr(columnName, columnNameList)
|
int columnNameIndex = 0;
|
||||||
|
foreach_ptr_with_index(columnName, columnNameList, columnNameIndex)
|
||||||
{
|
{
|
||||||
AttrNumber attrNumber = get_attnum(relationId, columnName);
|
AttrNumber attrNumber = get_attnum(relationId, columnName);
|
||||||
|
|
||||||
/* check whether this is the partition column */
|
/* check whether this is the partition column */
|
||||||
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
||||||
{
|
{
|
||||||
return partitionColumnIndex;
|
return columnNameIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionColumnIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -727,15 +725,13 @@ static int
|
||||||
PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
||||||
{
|
{
|
||||||
TargetEntry *insertTargetEntry = NULL;
|
TargetEntry *insertTargetEntry = NULL;
|
||||||
int targetEntryIndex = 0;
|
int insertTargetEntryIndex = 0;
|
||||||
foreach_ptr(insertTargetEntry, insertTargetList)
|
foreach_ptr_with_index(insertTargetEntry, insertTargetList, insertTargetEntryIndex)
|
||||||
{
|
{
|
||||||
if (insertTargetEntry->resno == partitionColumn->varattno)
|
if (insertTargetEntry->resno == partitionColumn->varattno)
|
||||||
{
|
{
|
||||||
return targetEntryIndex;
|
return insertTargetEntryIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
targetEntryIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -803,11 +799,11 @@ static void
|
||||||
WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
||||||
{
|
{
|
||||||
StringInfo projectedColumnsString = makeStringInfo();
|
StringInfo projectedColumnsString = makeStringInfo();
|
||||||
int entryIndex = 0;
|
|
||||||
TargetEntry *targetEntry = NULL;
|
TargetEntry *targetEntry = NULL;
|
||||||
foreach_ptr(targetEntry, projectedTargetEntries)
|
int targetEntryIndex = 0;
|
||||||
|
foreach_ptr_with_index(targetEntry, projectedTargetEntries, targetEntryIndex)
|
||||||
{
|
{
|
||||||
if (entryIndex != 0)
|
if (targetEntryIndex != 0)
|
||||||
{
|
{
|
||||||
appendStringInfoChar(projectedColumnsString, ',');
|
appendStringInfoChar(projectedColumnsString, ',');
|
||||||
}
|
}
|
||||||
|
@ -815,8 +811,6 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
||||||
char *columnName = targetEntry->resname;
|
char *columnName = targetEntry->resname;
|
||||||
Assert(columnName != NULL);
|
Assert(columnName != NULL);
|
||||||
appendStringInfoString(projectedColumnsString, quote_identifier(columnName));
|
appendStringInfoString(projectedColumnsString, quote_identifier(columnName));
|
||||||
|
|
||||||
entryIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
|
|
|
@ -460,14 +460,13 @@ SortTupleStore(CitusScanState *scanState)
|
||||||
Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
|
Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
|
||||||
bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool));
|
bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool));
|
||||||
|
|
||||||
int sortKeyIndex = 0;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Iterate on the returning target list and generate the necessary information
|
* Iterate on the returning target list and generate the necessary information
|
||||||
* for sorting the tuples.
|
* for sorting the tuples.
|
||||||
*/
|
*/
|
||||||
TargetEntry *returningEntry = NULL;
|
TargetEntry *returningEntry = NULL;
|
||||||
foreach_ptr(returningEntry, targetList)
|
int returningEntryIndex = 0;
|
||||||
|
foreach_ptr_with_index(returningEntry, targetList, returningEntryIndex)
|
||||||
{
|
{
|
||||||
Oid sortop = InvalidOid;
|
Oid sortop = InvalidOid;
|
||||||
|
|
||||||
|
@ -477,12 +476,10 @@ SortTupleStore(CitusScanState *scanState)
|
||||||
&sortop, NULL, NULL,
|
&sortop, NULL, NULL,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
sortColIdx[sortKeyIndex] = sortKeyIndex + 1;
|
sortColIdx[returningEntryIndex] = returningEntryIndex + 1;
|
||||||
sortOperators[sortKeyIndex] = sortop;
|
sortOperators[returningEntryIndex] = sortop;
|
||||||
collations[sortKeyIndex] = exprCollation((Node *) returningEntry->expr);
|
collations[returningEntryIndex] = exprCollation((Node *) returningEntry->expr);
|
||||||
nullsFirst[sortKeyIndex] = false;
|
nullsFirst[returningEntryIndex] = false;
|
||||||
|
|
||||||
sortKeyIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Tuplesortstate *tuplesortstate =
|
Tuplesortstate *tuplesortstate =
|
||||||
|
|
|
@ -1477,7 +1477,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
||||||
{
|
{
|
||||||
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
|
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
|
||||||
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
|
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
|
||||||
int arrayIndex = 0;
|
|
||||||
|
|
||||||
shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
|
shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
|
||||||
shardIntervalArrayLength *
|
shardIntervalArrayLength *
|
||||||
|
@ -1493,7 +1492,8 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
||||||
sizeof(int));
|
sizeof(int));
|
||||||
|
|
||||||
HeapTuple shardTuple = NULL;
|
HeapTuple shardTuple = NULL;
|
||||||
foreach_ptr(shardTuple, distShardTupleList)
|
int shardTupleIndex = 0;
|
||||||
|
foreach_ptr_with_index(shardTuple, distShardTupleList, shardTupleIndex)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = TupleToShardInterval(shardTuple,
|
ShardInterval *shardInterval = TupleToShardInterval(shardTuple,
|
||||||
distShardTupleDesc,
|
distShardTupleDesc,
|
||||||
|
@ -1501,13 +1501,11 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
||||||
intervalTypeMod);
|
intervalTypeMod);
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||||
|
|
||||||
shardIntervalArray[arrayIndex] = CopyShardInterval(shardInterval);
|
shardIntervalArray[shardTupleIndex] = CopyShardInterval(shardInterval);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
heap_freetuple(shardTuple);
|
heap_freetuple(shardTuple);
|
||||||
|
|
||||||
arrayIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
table_close(distShardRelation, AccessShareLock);
|
table_close(distShardRelation, AccessShareLock);
|
||||||
|
@ -1603,7 +1601,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
|
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
|
||||||
int64 shardId = shardInterval->shardId;
|
int64 shardId = shardInterval->shardId;
|
||||||
int placementOffset = 0;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Enable quick lookups of this shard ID by adding it to ShardIdCacheHash
|
* Enable quick lookups of this shard ID by adding it to ShardIdCacheHash
|
||||||
|
@ -1631,10 +1628,10 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
||||||
GroupShardPlacement *placementArray = palloc0(numberOfPlacements *
|
GroupShardPlacement *placementArray = palloc0(numberOfPlacements *
|
||||||
sizeof(GroupShardPlacement));
|
sizeof(GroupShardPlacement));
|
||||||
GroupShardPlacement *srcPlacement = NULL;
|
GroupShardPlacement *srcPlacement = NULL;
|
||||||
foreach_ptr(srcPlacement, placementList)
|
int srcPlacementIndex = 0;
|
||||||
|
foreach_ptr_with_index(srcPlacement, placementList, srcPlacementIndex)
|
||||||
{
|
{
|
||||||
placementArray[placementOffset] = *srcPlacement;
|
placementArray[srcPlacementIndex] = *srcPlacement;
|
||||||
placementOffset++;
|
|
||||||
}
|
}
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
|
|
@ -754,7 +754,6 @@ NodeListInsertCommand(List *workerNodeList)
|
||||||
{
|
{
|
||||||
StringInfo nodeListInsertCommand = makeStringInfo();
|
StringInfo nodeListInsertCommand = makeStringInfo();
|
||||||
int workerCount = list_length(workerNodeList);
|
int workerCount = list_length(workerNodeList);
|
||||||
int processedWorkerNodeCount = 0;
|
|
||||||
Oid primaryRole = PrimaryNodeRoleId();
|
Oid primaryRole = PrimaryNodeRoleId();
|
||||||
|
|
||||||
/* if there are no workers, return NULL */
|
/* if there are no workers, return NULL */
|
||||||
|
@ -779,7 +778,8 @@ NodeListInsertCommand(List *workerNodeList)
|
||||||
|
|
||||||
/* iterate over the worker nodes, add the values */
|
/* iterate over the worker nodes, add the values */
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
int workerNodeIndex = 0;
|
||||||
|
foreach_ptr_with_index(workerNode, workerNodeList, workerNodeIndex)
|
||||||
{
|
{
|
||||||
char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
|
char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
|
||||||
char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
|
char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
|
||||||
|
@ -790,6 +790,11 @@ NodeListInsertCommand(List *workerNodeList)
|
||||||
Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
|
Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
|
||||||
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
|
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
|
||||||
|
|
||||||
|
if (workerNodeIndex != 0)
|
||||||
|
{
|
||||||
|
appendStringInfo(nodeListInsertCommand, ",");
|
||||||
|
}
|
||||||
|
|
||||||
appendStringInfo(nodeListInsertCommand,
|
appendStringInfo(nodeListInsertCommand,
|
||||||
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)",
|
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)",
|
||||||
workerNode->nodeId,
|
workerNode->nodeId,
|
||||||
|
@ -803,12 +808,6 @@ NodeListInsertCommand(List *workerNodeList)
|
||||||
nodeRoleString,
|
nodeRoleString,
|
||||||
quote_literal_cstr(workerNode->nodeCluster),
|
quote_literal_cstr(workerNode->nodeCluster),
|
||||||
shouldHaveShards);
|
shouldHaveShards);
|
||||||
|
|
||||||
processedWorkerNodeCount++;
|
|
||||||
if (processedWorkerNodeCount != workerCount)
|
|
||||||
{
|
|
||||||
appendStringInfo(nodeListInsertCommand, ",");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nodeListInsertCommand->data;
|
return nodeListInsertCommand->data;
|
||||||
|
@ -963,7 +962,8 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
"WITH shard_data(relationname, shardid, storagetype, "
|
"WITH shard_data(relationname, shardid, storagetype, "
|
||||||
"shardminvalue, shardmaxvalue) AS (VALUES ");
|
"shardminvalue, shardmaxvalue) AS (VALUES ");
|
||||||
|
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
int shardIntervalIndex = 0;
|
||||||
|
foreach_ptr_with_index(shardInterval, shardIntervalList, shardIntervalIndex)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
Oid distributedRelationId = shardInterval->relationId;
|
Oid distributedRelationId = shardInterval->relationId;
|
||||||
|
@ -992,6 +992,11 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
appendStringInfo(maxHashToken, "NULL");
|
appendStringInfo(maxHashToken, "NULL");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (shardIntervalIndex != 0)
|
||||||
|
{
|
||||||
|
appendStringInfo(insertShardCommand, ", ");
|
||||||
|
}
|
||||||
|
|
||||||
appendStringInfo(insertShardCommand,
|
appendStringInfo(insertShardCommand,
|
||||||
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
|
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
|
||||||
quote_literal_cstr(qualifiedRelationName),
|
quote_literal_cstr(qualifiedRelationName),
|
||||||
|
@ -999,11 +1004,6 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
shardInterval->storageType,
|
shardInterval->storageType,
|
||||||
minHashToken->data,
|
minHashToken->data,
|
||||||
maxHashToken->data);
|
maxHashToken->data);
|
||||||
|
|
||||||
if (llast(shardIntervalList) != shardInterval)
|
|
||||||
{
|
|
||||||
appendStringInfo(insertShardCommand, ", ");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfo(insertShardCommand, ") ");
|
appendStringInfo(insertShardCommand, ") ");
|
||||||
|
|
|
@ -280,12 +280,12 @@ CheckRebalanceStateInvariants(const RebalanceState *state)
|
||||||
{
|
{
|
||||||
NodeFillState *fillState = NULL;
|
NodeFillState *fillState = NULL;
|
||||||
NodeFillState *prevFillState = NULL;
|
NodeFillState *prevFillState = NULL;
|
||||||
int fillStateIndex = 0;
|
|
||||||
int fillStateLength = list_length(state->fillStateListAsc);
|
int fillStateLength = list_length(state->fillStateListAsc);
|
||||||
|
|
||||||
Assert(state != NULL);
|
Assert(state != NULL);
|
||||||
Assert(list_length(state->fillStateListAsc) == list_length(state->fillStateListDesc));
|
Assert(list_length(state->fillStateListAsc) == list_length(state->fillStateListDesc));
|
||||||
foreach_ptr(fillState, state->fillStateListAsc)
|
int fillStateIndex = 0;
|
||||||
|
foreach_ptr_with_index(fillState, state->fillStateListAsc, fillStateIndex)
|
||||||
{
|
{
|
||||||
float4 totalCost = 0;
|
float4 totalCost = 0;
|
||||||
ShardCost *shardCost = NULL;
|
ShardCost *shardCost = NULL;
|
||||||
|
@ -333,7 +333,6 @@ CheckRebalanceStateInvariants(const RebalanceState *state)
|
||||||
1000);
|
1000);
|
||||||
|
|
||||||
prevFillState = fillState;
|
prevFillState = fillState;
|
||||||
fillStateIndex++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,6 @@ Datum
|
||||||
load_shard_id_array(PG_FUNCTION_ARGS)
|
load_shard_id_array(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = PG_GETARG_OID(0);
|
Oid distributedTableId = PG_GETARG_OID(0);
|
||||||
int shardIdIndex = 0;
|
|
||||||
Oid shardIdTypeId = INT8OID;
|
Oid shardIdTypeId = INT8OID;
|
||||||
|
|
||||||
List *shardList = LoadShardIntervalList(distributedTableId);
|
List *shardList = LoadShardIntervalList(distributedTableId);
|
||||||
|
@ -71,12 +70,12 @@ load_shard_id_array(PG_FUNCTION_ARGS)
|
||||||
Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));
|
Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
foreach_ptr(shardInterval, shardList)
|
int shardIntervalIndex = 0;
|
||||||
|
foreach_ptr_with_index(shardInterval, shardList, shardIntervalIndex)
|
||||||
{
|
{
|
||||||
Datum shardIdDatum = Int64GetDatum(shardInterval->shardId);
|
Datum shardIdDatum = Int64GetDatum(shardInterval->shardId);
|
||||||
|
|
||||||
shardIdDatumArray[shardIdIndex] = shardIdDatum;
|
shardIdDatumArray[shardIntervalIndex] = shardIdDatum;
|
||||||
shardIdIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ArrayType *shardIdArrayType = DatumArrayToArrayType(shardIdDatumArray, shardIdCount,
|
ArrayType *shardIdArrayType = DatumArrayToArrayType(shardIdDatumArray, shardIdCount,
|
||||||
|
@ -122,7 +121,6 @@ load_shard_placement_array(PG_FUNCTION_ARGS)
|
||||||
int64 shardId = PG_GETARG_INT64(0);
|
int64 shardId = PG_GETARG_INT64(0);
|
||||||
bool onlyActive = PG_GETARG_BOOL(1);
|
bool onlyActive = PG_GETARG_BOOL(1);
|
||||||
List *placementList = NIL;
|
List *placementList = NIL;
|
||||||
int placementIndex = 0;
|
|
||||||
Oid placementTypeId = TEXTOID;
|
Oid placementTypeId = TEXTOID;
|
||||||
StringInfo placementInfo = makeStringInfo();
|
StringInfo placementInfo = makeStringInfo();
|
||||||
|
|
||||||
|
@ -141,13 +139,13 @@ load_shard_placement_array(PG_FUNCTION_ARGS)
|
||||||
Datum *placementDatumArray = palloc0(placementCount * sizeof(Datum));
|
Datum *placementDatumArray = palloc0(placementCount * sizeof(Datum));
|
||||||
|
|
||||||
ShardPlacement *placement = NULL;
|
ShardPlacement *placement = NULL;
|
||||||
foreach_ptr(placement, placementList)
|
int placementIndex = 0;
|
||||||
|
foreach_ptr_with_index(placement, placementList, placementIndex)
|
||||||
{
|
{
|
||||||
appendStringInfo(placementInfo, "%s:%d", placement->nodeName,
|
appendStringInfo(placementInfo, "%s:%d", placement->nodeName,
|
||||||
placement->nodePort);
|
placement->nodePort);
|
||||||
|
|
||||||
placementDatumArray[placementIndex] = CStringGetTextDatum(placementInfo->data);
|
placementDatumArray[placementIndex] = CStringGetTextDatum(placementInfo->data);
|
||||||
placementIndex++;
|
|
||||||
resetStringInfo(placementInfo);
|
resetStringInfo(placementInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,6 @@ master_metadata_snapshot(PG_FUNCTION_ARGS)
|
||||||
List *dropSnapshotCommands = MetadataDropCommands();
|
List *dropSnapshotCommands = MetadataDropCommands();
|
||||||
List *createSnapshotCommands = MetadataCreateCommands();
|
List *createSnapshotCommands = MetadataCreateCommands();
|
||||||
List *snapshotCommandList = NIL;
|
List *snapshotCommandList = NIL;
|
||||||
int snapshotCommandIndex = 0;
|
|
||||||
Oid ddlCommandTypeId = TEXTOID;
|
Oid ddlCommandTypeId = TEXTOID;
|
||||||
|
|
||||||
snapshotCommandList = list_concat(snapshotCommandList, dropSnapshotCommands);
|
snapshotCommandList = list_concat(snapshotCommandList, dropSnapshotCommands);
|
||||||
|
@ -54,12 +53,14 @@ master_metadata_snapshot(PG_FUNCTION_ARGS)
|
||||||
Datum *snapshotCommandDatumArray = palloc0(snapshotCommandCount * sizeof(Datum));
|
Datum *snapshotCommandDatumArray = palloc0(snapshotCommandCount * sizeof(Datum));
|
||||||
|
|
||||||
const char *metadataSnapshotCommand = NULL;
|
const char *metadataSnapshotCommand = NULL;
|
||||||
foreach_ptr(metadataSnapshotCommand, snapshotCommandList)
|
int metadataSnapshotCommandIndex = 0;
|
||||||
|
foreach_ptr_with_index(metadataSnapshotCommand, snapshotCommandList,
|
||||||
|
metadataSnapshotCommandIndex)
|
||||||
{
|
{
|
||||||
Datum metadataSnapshotCommandDatum = CStringGetTextDatum(metadataSnapshotCommand);
|
Datum metadataSnapshotCommandDatum = CStringGetTextDatum(metadataSnapshotCommand);
|
||||||
|
|
||||||
snapshotCommandDatumArray[snapshotCommandIndex] = metadataSnapshotCommandDatum;
|
snapshotCommandDatumArray[metadataSnapshotCommandIndex] =
|
||||||
snapshotCommandIndex++;
|
metadataSnapshotCommandDatum;
|
||||||
}
|
}
|
||||||
|
|
||||||
ArrayType *snapshotCommandArrayType = DatumArrayToArrayType(snapshotCommandDatumArray,
|
ArrayType *snapshotCommandArrayType = DatumArrayToArrayType(snapshotCommandDatumArray,
|
||||||
|
|
|
@ -207,7 +207,6 @@ MakeTextPartitionExpression(Oid distributedTableId, text *value)
|
||||||
static ArrayType *
|
static ArrayType *
|
||||||
PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList)
|
PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList)
|
||||||
{
|
{
|
||||||
int shardIdIndex = 0;
|
|
||||||
Oid shardIdTypeId = INT8OID;
|
Oid shardIdTypeId = INT8OID;
|
||||||
Index tableId = 1;
|
Index tableId = 1;
|
||||||
|
|
||||||
|
@ -218,12 +217,12 @@ PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList)
|
||||||
Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));
|
Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
foreach_ptr(shardInterval, shardList)
|
int shardIntervalIndex = 0;
|
||||||
|
foreach_ptr_with_index(shardInterval, shardList, shardIntervalIndex)
|
||||||
{
|
{
|
||||||
Datum shardIdDatum = Int64GetDatum(shardInterval->shardId);
|
Datum shardIdDatum = Int64GetDatum(shardInterval->shardId);
|
||||||
|
|
||||||
shardIdDatumArray[shardIdIndex] = shardIdDatum;
|
shardIdDatumArray[shardIntervalIndex] = shardIdDatum;
|
||||||
shardIdIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ArrayType *shardIdArrayType = DatumArrayToArrayType(shardIdDatumArray, shardIdCount,
|
ArrayType *shardIdArrayType = DatumArrayToArrayType(shardIdDatumArray, shardIdCount,
|
||||||
|
|
|
@ -669,9 +669,10 @@ WaitsForToString(List *waitsFor)
|
||||||
StringInfo transactionIdStr = makeStringInfo();
|
StringInfo transactionIdStr = makeStringInfo();
|
||||||
|
|
||||||
TransactionNode *waitingNode = NULL;
|
TransactionNode *waitingNode = NULL;
|
||||||
foreach_ptr(waitingNode, waitsFor)
|
int waitingNodeIndex = 0;
|
||||||
|
foreach_ptr_with_index(waitingNode, waitsFor, waitingNodeIndex)
|
||||||
{
|
{
|
||||||
if (transactionIdStr->len != 0)
|
if (waitingNodeIndex != 0)
|
||||||
{
|
{
|
||||||
appendStringInfoString(transactionIdStr, ",");
|
appendStringInfoString(transactionIdStr, ",");
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,17 +182,17 @@ get_colocated_shard_array(PG_FUNCTION_ARGS)
|
||||||
int colocatedShardCount = list_length(colocatedShardList);
|
int colocatedShardCount = list_length(colocatedShardList);
|
||||||
Datum *colocatedShardsDatumArray = palloc0(colocatedShardCount * sizeof(Datum));
|
Datum *colocatedShardsDatumArray = palloc0(colocatedShardCount * sizeof(Datum));
|
||||||
Oid arrayTypeId = OIDOID;
|
Oid arrayTypeId = OIDOID;
|
||||||
int colocatedShardIndex = 0;
|
|
||||||
|
|
||||||
ShardInterval *colocatedShardInterval = NULL;
|
ShardInterval *colocatedShardInterval = NULL;
|
||||||
foreach_ptr(colocatedShardInterval, colocatedShardList)
|
int colocatedShardIntervalIndex = 0;
|
||||||
|
foreach_ptr_with_index(colocatedShardInterval, colocatedShardList,
|
||||||
|
colocatedShardIntervalIndex)
|
||||||
{
|
{
|
||||||
uint64 colocatedShardId = colocatedShardInterval->shardId;
|
uint64 colocatedShardId = colocatedShardInterval->shardId;
|
||||||
|
|
||||||
Datum colocatedShardDatum = Int64GetDatum(colocatedShardId);
|
Datum colocatedShardDatum = Int64GetDatum(colocatedShardId);
|
||||||
|
|
||||||
colocatedShardsDatumArray[colocatedShardIndex] = colocatedShardDatum;
|
colocatedShardsDatumArray[colocatedShardIntervalIndex] = colocatedShardDatum;
|
||||||
colocatedShardIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ArrayType *colocatedShardsArrayType = DatumArrayToArrayType(colocatedShardsDatumArray,
|
ArrayType *colocatedShardsArrayType = DatumArrayToArrayType(colocatedShardsDatumArray,
|
||||||
|
|
|
@ -41,11 +41,10 @@ SortList(List *pointerList, int (*comparisonFunction)(const void *, const void *
|
||||||
void **array = (void **) palloc0(arraySize * sizeof(void *));
|
void **array = (void **) palloc0(arraySize * sizeof(void *));
|
||||||
|
|
||||||
void *pointer = NULL;
|
void *pointer = NULL;
|
||||||
foreach_ptr(pointer, pointerList)
|
int pointerIndex = 0;
|
||||||
|
foreach_ptr_with_index(pointer, pointerList, pointerIndex)
|
||||||
{
|
{
|
||||||
array[arrayIndex] = pointer;
|
array[pointerIndex] = pointer;
|
||||||
|
|
||||||
arrayIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* sort the array of pointers using the comparison function */
|
/* sort the array of pointers using the comparison function */
|
||||||
|
@ -77,13 +76,12 @@ PointerArrayFromList(List *pointerList)
|
||||||
{
|
{
|
||||||
int pointerCount = list_length(pointerList);
|
int pointerCount = list_length(pointerList);
|
||||||
void **pointerArray = (void **) palloc0(pointerCount * sizeof(void *));
|
void **pointerArray = (void **) palloc0(pointerCount * sizeof(void *));
|
||||||
int pointerIndex = 0;
|
|
||||||
|
|
||||||
void *pointer = NULL;
|
void *pointer = NULL;
|
||||||
foreach_ptr(pointer, pointerList)
|
int pointerIndex = 0;
|
||||||
|
foreach_ptr_with_index(pointer, pointerList, pointerIndex)
|
||||||
{
|
{
|
||||||
pointerArray[pointerIndex] = pointer;
|
pointerArray[pointerIndex] = pointer;
|
||||||
pointerIndex += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pointerArray;
|
return pointerArray;
|
||||||
|
@ -188,15 +186,14 @@ StringJoin(List *stringList, char delimiter)
|
||||||
StringInfo joinedString = makeStringInfo();
|
StringInfo joinedString = makeStringInfo();
|
||||||
|
|
||||||
const char *command = NULL;
|
const char *command = NULL;
|
||||||
int curIndex = 0;
|
int commandIndex = 0;
|
||||||
foreach_ptr(command, stringList)
|
foreach_ptr_with_index(command, stringList, commandIndex)
|
||||||
{
|
{
|
||||||
if (curIndex > 0)
|
if (commandIndex != 0)
|
||||||
{
|
{
|
||||||
appendStringInfoChar(joinedString, delimiter);
|
appendStringInfoChar(joinedString, delimiter);
|
||||||
}
|
}
|
||||||
appendStringInfoString(joinedString, command);
|
appendStringInfoString(joinedString, command);
|
||||||
curIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return joinedString->data;
|
return joinedString->data;
|
||||||
|
@ -212,14 +209,12 @@ List *
|
||||||
ListTake(List *pointerList, int size)
|
ListTake(List *pointerList, int size)
|
||||||
{
|
{
|
||||||
List *result = NIL;
|
List *result = NIL;
|
||||||
int listIndex = 0;
|
|
||||||
|
|
||||||
void *pointer = NULL;
|
void *pointer = NULL;
|
||||||
foreach_ptr(pointer, pointerList)
|
foreach_ptr(pointer, pointerList)
|
||||||
{
|
{
|
||||||
result = lappend(result, pointer);
|
result = lappend(result, pointer);
|
||||||
listIndex++;
|
if (list_length(result) >= size)
|
||||||
if (listIndex >= size)
|
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,8 +216,6 @@ static void
|
||||||
LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
|
LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
|
||||||
{
|
{
|
||||||
StringInfo lockCommand = makeStringInfo();
|
StringInfo lockCommand = makeStringInfo();
|
||||||
int processedShardIntervalCount = 0;
|
|
||||||
int totalShardIntervalCount = list_length(shardIntervalList);
|
|
||||||
WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode();
|
WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode();
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
const char *superuser = CurrentUserName();
|
const char *superuser = CurrentUserName();
|
||||||
|
@ -225,17 +223,16 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
|
||||||
appendStringInfo(lockCommand, "SELECT lock_shard_resources(%d, ARRAY[", lockmode);
|
appendStringInfo(lockCommand, "SELECT lock_shard_resources(%d, ARRAY[", lockmode);
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
int shardIntervalIndex = 0;
|
||||||
|
foreach_ptr_with_index(shardInterval, shardIntervalList, shardIntervalIndex)
|
||||||
{
|
{
|
||||||
int64 shardId = shardInterval->shardId;
|
if (shardIntervalIndex != 0)
|
||||||
|
|
||||||
appendStringInfo(lockCommand, "%lu", shardId);
|
|
||||||
|
|
||||||
processedShardIntervalCount++;
|
|
||||||
if (processedShardIntervalCount != totalShardIntervalCount)
|
|
||||||
{
|
{
|
||||||
appendStringInfo(lockCommand, ", ");
|
appendStringInfo(lockCommand, ", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64 shardId = shardInterval->shardId;
|
||||||
|
appendStringInfo(lockCommand, "%lu", shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfo(lockCommand, "])");
|
appendStringInfo(lockCommand, "])");
|
||||||
|
@ -303,8 +300,6 @@ void
|
||||||
LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList)
|
LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList)
|
||||||
{
|
{
|
||||||
StringInfo lockCommand = makeStringInfo();
|
StringInfo lockCommand = makeStringInfo();
|
||||||
int processedShardIntervalCount = 0;
|
|
||||||
int totalShardIntervalCount = list_length(shardIntervalList);
|
|
||||||
|
|
||||||
if (list_length(shardIntervalList) == 0)
|
if (list_length(shardIntervalList) == 0)
|
||||||
{
|
{
|
||||||
|
@ -314,17 +309,16 @@ LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList)
|
||||||
appendStringInfo(lockCommand, "SELECT lock_shard_metadata(%d, ARRAY[", lockmode);
|
appendStringInfo(lockCommand, "SELECT lock_shard_metadata(%d, ARRAY[", lockmode);
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
int shardIntervalIndex = 0;
|
||||||
|
foreach_ptr_with_index(shardInterval, shardIntervalList, shardIntervalIndex)
|
||||||
{
|
{
|
||||||
int64 shardId = shardInterval->shardId;
|
if (shardIntervalIndex != 0)
|
||||||
|
|
||||||
appendStringInfo(lockCommand, "%lu", shardId);
|
|
||||||
|
|
||||||
processedShardIntervalCount++;
|
|
||||||
if (processedShardIntervalCount != totalShardIntervalCount)
|
|
||||||
{
|
{
|
||||||
appendStringInfo(lockCommand, ", ");
|
appendStringInfo(lockCommand, ", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64 shardId = shardInterval->shardId;
|
||||||
|
appendStringInfo(lockCommand, "%lu", shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfo(lockCommand, "])");
|
appendStringInfo(lockCommand, "])");
|
||||||
|
|
|
@ -33,6 +33,13 @@ typedef struct ListCellAndListWrapper
|
||||||
ListCell *listCell;
|
ListCell *listCell;
|
||||||
} ListCellAndListWrapper;
|
} ListCellAndListWrapper;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct ListCellAndIndex
|
||||||
|
{
|
||||||
|
ListCell *listCell;
|
||||||
|
int index;
|
||||||
|
} ListCellAndIndex;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* foreach_ptr -
|
* foreach_ptr -
|
||||||
* a convenience macro which loops through a pointer list without needing a
|
* a convenience macro which loops through a pointer list without needing a
|
||||||
|
@ -52,9 +59,17 @@ typedef struct ListCellAndListWrapper
|
||||||
for (ListCell *(var ## CellDoNotUse) = list_head(l); \
|
for (ListCell *(var ## CellDoNotUse) = list_head(l); \
|
||||||
(var ## CellDoNotUse) != NULL && \
|
(var ## CellDoNotUse) != NULL && \
|
||||||
(((var) = lfirst(var ## CellDoNotUse)) || true); \
|
(((var) = lfirst(var ## CellDoNotUse)) || true); \
|
||||||
var ## CellDoNotUse = lnext_compat(l, var ## CellDoNotUse))
|
(var ## CellDoNotUse) = lnext_compat(l, var ## CellDoNotUse))
|
||||||
|
|
||||||
|
|
||||||
|
#define foreach_ptr_with_index(var, l, index) \
|
||||||
|
(index) = 0; \
|
||||||
|
for (ListCell *(var ## CellDoNotUse) = list_head(l); \
|
||||||
|
(var ## CellDoNotUse) != NULL && \
|
||||||
|
(((var) = lfirst(var ## CellDoNotUse)) || true); \
|
||||||
|
(var ## CellDoNotUse) = lnext_compat(l, var ## CellDoNotUse), \
|
||||||
|
(index)++)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* foreach_int -
|
* foreach_int -
|
||||||
* a convenience macro which loops through an int list without needing a
|
* a convenience macro which loops through an int list without needing a
|
||||||
|
@ -113,6 +128,7 @@ typedef struct ListCellAndListWrapper
|
||||||
#define foreach_ptr_append(var, l) foreach_ptr(var, l)
|
#define foreach_ptr_append(var, l) foreach_ptr(var, l)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/* utility functions declaration shared within this module */
|
/* utility functions declaration shared within this module */
|
||||||
extern List * SortList(List *pointerList,
|
extern List * SortList(List *pointerList,
|
||||||
int (*ComparisonFunction)(const void *, const void *));
|
int (*ComparisonFunction)(const void *, const void *));
|
||||||
|
|
Loading…
Reference in New Issue