BuildCitusTableCacheEntry

multi-column-distribution
Jelte Fennema 2021-06-08 15:53:14 +02:00
parent 361acde19d
commit f596827e7e
4 changed files with 62 additions and 4 deletions

View File

@ -518,8 +518,14 @@ CreateDistributedTable(Oid relationId, List *distributionColumnList,
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case, * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
* our caller already acquired lock on relationId. * our caller already acquired lock on relationId.
*/ */
uint32 colocationId = ColocationIdForNewTable(relationId, linitial(
distributionColumnList), /* TODO: Use full column list */
Var *distributionColumn = NULL;
if (list_length(distributionColumnList) > 0)
{
distributionColumn = linitial(distributionColumnList);
}
uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
distributionMethod, replicationModel, distributionMethod, replicationModel,
shardCount, shardCountIsStrict, shardCount, shardCountIsStrict,
colocateWithTableName, colocateWithTableName,
@ -1222,8 +1228,14 @@ EnsureRelationCanBeDistributed(Oid relationId, List *distributionColumnList,
} }
} }
/* TODO: Use full column list */
distributionColumn = NULL;
if (list_length(distributionColumnList) > 0)
{
distributionColumn = linitial(distributionColumnList);
}
ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel, ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel,
linitial(distributionColumnList), colocationId); distributionColumn, colocationId);
ErrorIfUnsupportedPolicy(relation); ErrorIfUnsupportedPolicy(relation);

View File

@ -1240,11 +1240,39 @@ BuildCitusTableCacheEntry(Oid relationId)
cacheEntry->partitionMethod = datumArray[Anum_pg_dist_partition_partmethod - 1]; cacheEntry->partitionMethod = datumArray[Anum_pg_dist_partition_partmethod - 1];
Datum partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1]; Datum partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1];
bool partitionKeyIsNull = isNullArray[Anum_pg_dist_partition_partkey - 1]; bool partitionKeyIsNull = isNullArray[Anum_pg_dist_partition_partkey - 1];
Datum partitionKeysDatum = datumArray[Anum_pg_dist_partition_partkeys - 1];
bool partitionKeysIsNull = isNullArray[Anum_pg_dist_partition_partkeys - 1];
/* note that for reference tables partitionKeyisNull is true */ /* note that for reference tables partitionKeyIsNull is true */
if (!partitionKeyIsNull) if (!partitionKeyIsNull)
{ {
oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
if (!partitionKeysIsNull)
{
ArrayType *partitionKeysArray = DatumGetArrayTypeP(partitionKeysDatum);
int partitionKeysCount = ArrayObjectCount(partitionKeysArray);
Datum *partitionKeysArrayDatum = DeconstructArrayObject(partitionKeysArray);
for (int i = 0; i < partitionKeysCount; i++)
{
/* get the string representation of the partition column Var */
char *partitionKeyString = TextDatumGetCString(
partitionKeysArrayDatum[i]);
/* convert the string to a Node and ensure it is a Var */
Node *partitionNode = stringToNode(partitionKeyString);
Assert(IsA(partitionNode, Var));
cacheEntry->partitionKeyStrings =
lappend(cacheEntry->partitionKeyStrings, partitionKeyString);
cacheEntry->partitionColumns =
lappend(cacheEntry->partitionColumns, partitionNode);
}
/* TODO: uncomment once fixed
* cacheEntry->partitionColumn = linitial(cacheEntry->partitionColumns);
* cacheEntry->partitionKeyString = linitial(cacheEntry->partitionKeyStrings);
*/
}
/* get the string representation of the partition column Var */ /* get the string representation of the partition column Var */
cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum); cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum);
@ -1255,6 +1283,7 @@ BuildCitusTableCacheEntry(Oid relationId)
cacheEntry->partitionColumn = (Var *) partitionNode; cacheEntry->partitionColumn = (Var *) partitionNode;
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
else else
@ -3483,6 +3512,18 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
return; return;
} }
if (cacheEntry->partitionKeyStrings != NIL)
{
list_free(cacheEntry->partitionKeyStrings);
cacheEntry->partitionKeyStrings = NIL;
}
if (cacheEntry->partitionColumns != NIL)
{
list_free(cacheEntry->partitionColumns);
cacheEntry->partitionColumns = NIL;
}
/* clean up ShardIdCacheHash */ /* clean up ShardIdCacheHash */
RemoveStaleShardIdCacheEntries(cacheEntry); RemoveStaleShardIdCacheEntries(cacheEntry);

View File

@ -1756,6 +1756,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
/* set partkey column to NULL for reference tables */ /* set partkey column to NULL for reference tables */
if (distributionMethod != DISTRIBUTE_BY_NONE) if (distributionMethod != DISTRIBUTE_BY_NONE)
{ {
Assert(list_length(distributionColumnList) > 0);
Assert(linitial(distributionColumnList) != NULL);
Datum *distributionColumnDatumArray = Datum *distributionColumnDatumArray =
palloc0(list_length(distributionColumnList) * sizeof(Datum)); palloc0(list_length(distributionColumnList) * sizeof(Datum));

View File

@ -60,6 +60,8 @@ typedef struct
/* pg_dist_partition metadata for this table */ /* pg_dist_partition metadata for this table */
char *partitionKeyString; char *partitionKeyString;
Var *partitionColumn; Var *partitionColumn;
List *partitionKeyStrings;
List *partitionColumns;
char partitionMethod; char partitionMethod;
uint32 colocationId; uint32 colocationId;
char replicationModel; char replicationModel;