Refactor CreateDistributedTable to take column name

pull/5723/head
Ahmet Gedemenli 2022-02-17 18:07:18 +03:00
parent 6d16e9ba56
commit 2bc6a00408
9 changed files with 67 additions and 147 deletions

View File

@ -1059,7 +1059,8 @@ CreateTableConversion(TableConversionParameters *params)
}
relation_close(relation, NoLock);
con->distributionKey =
BuildDistributionKeyFromColumnName(relation, con->distributionColumn);
BuildDistributionKeyFromColumnName(con->relationId, con->distributionColumn,
NoLock);
con->originalAccessMethod = NULL;
if (!PartitionedTable(con->relationId) && !IsForeignTable(con->relationId))
@ -1175,6 +1176,9 @@ CreateDistributedTableLike(TableConversionState *con)
newShardCount = con->shardCount;
}
char *distributionColumnName =
ColumnToColumnName(con->newRelationId, (Node *) newDistributionKey);
Oid originalRelationId = con->relationId;
if (con->originalDistributionKey != NULL && PartitionTable(originalRelationId))
{
@ -1190,16 +1194,13 @@ CreateDistributedTableLike(TableConversionState *con)
*/
Oid parentRelationId = PartitionParentOid(originalRelationId);
Var *parentDistKey = DistPartitionKeyOrError(parentRelationId);
char *parentDistKeyColumnName =
ColumnToColumnName(parentRelationId, nodeToString(parentDistKey));
newDistributionKey =
FindColumnWithNameOnTargetRelation(parentRelationId, parentDistKeyColumnName,
con->newRelationId);
distributionColumnName =
ColumnToColumnName(parentRelationId, (Node *) parentDistKey);
}
char partitionMethod = PartitionMethod(con->relationId);
CreateDistributedTable(con->newRelationId, newDistributionKey, partitionMethod,
CreateDistributedTable(con->newRelationId, distributionColumnName, partitionMethod,
newShardCount, true, newColocateWith, false);
}

View File

@ -159,30 +159,14 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
char *colocateWithTableName = NULL;
bool viaDeprecatedAPI = true;
/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("could not create distributed table: "
"relation does not exist")));
}
char *distributionColumnName = text_to_cstring(distributionColumnText);
Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
distributionColumnName);
Assert(distributionColumn != NULL);
Assert(distributionColumnName != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
relation_close(relation, NoLock);
PG_RETURN_VOID();
}
@ -249,9 +233,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
relation_close(relation, NoLock);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
distributionColumnName);
Assert(distributionColumn != NULL);
Assert(distributionColumnName != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
@ -261,7 +244,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
shardCount, MAX_SHARD_COUNT)));
}
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName,
viaDeprecatedAPI);
@ -281,7 +264,7 @@ create_reference_table(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
char *colocateWithTableName = NULL;
Var *distributionColumn = NULL;
char *distributionColumnName = NULL;
bool viaDeprecatedAPI = false;
@ -317,7 +300,7 @@ create_reference_table(PG_FUNCTION_ARGS)
errdetail("There are no active worker nodes.")));
}
CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
CreateDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE,
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
PG_RETURN_VOID();
}
@ -385,9 +368,10 @@ EnsureRelationExists(Oid relationId)
* day, once we deprecate master_create_distribute_table completely.
*/
void
CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName, bool viaDeprecatedAPI)
CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName,
bool viaDeprecatedAPI)
{
/*
* EnsureTableNotDistributed errors out when relation is a citus table but
@ -443,6 +427,8 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
}
LockRelationOid(relationId, ExclusiveLock);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
@ -463,22 +449,9 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
colocateWithTableName,
viaDeprecatedAPI);
/*
* Due to dropping columns, the parent's distribution key may not match the
* partition's distribution key. The input distributionColumn belongs to
* the parent. That's why we override the distribution column of partitions
* here. See issue #5123 for details.
*/
if (PartitionTable(relationId))
{
Oid parentRelationId = PartitionParentOid(relationId);
char *distributionColumnName =
ColumnToColumnName(parentRelationId, nodeToString(distributionColumn));
distributionColumn =
FindColumnWithNameOnTargetRelation(parentRelationId, distributionColumnName,
relationId);
}
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName,
ExclusiveLock);
/*
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
@ -567,7 +540,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
foreach_oid(partitionRelationId, partitionList)
{
CreateDistributedTable(partitionRelationId, distributionColumn,
CreateDistributedTable(partitionRelationId, distributionColumnName,
distributionMethod, shardCount, false,
parentRelationName, viaDeprecatedAPI);
}

View File

@ -378,6 +378,8 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
}
Var *parentDistributionColumn = DistPartitionKeyOrError(parentRelationId);
char *distributionColumnName =
ColumnToColumnName(parentRelationId, (Node *) parentDistributionColumn);
char parentDistributionMethod = DISTRIBUTE_BY_HASH;
char *parentRelationName = generate_qualified_relation_name(parentRelationId);
bool viaDeprecatedAPI = false;
@ -385,7 +387,7 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(parentRelationId,
relationId);
CreateDistributedTable(relationId, parentDistributionColumn,
CreateDistributedTable(relationId, distributionColumnName,
parentDistributionMethod, ShardCount, false,
parentRelationName, viaDeprecatedAPI);
}
@ -573,13 +575,8 @@ static void
DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationId)
{
Var *distributionColumn = DistPartitionKeyOrError(parentCitusRelationId);
char *distributionColumnName =
ColumnToColumnName(parentCitusRelationId,
nodeToString(distributionColumn));
distributionColumn =
FindColumnWithNameOnTargetRelation(parentCitusRelationId,
distributionColumnName,
partitionRelationId);
char *distributionColumnName = ColumnToColumnName(parentCitusRelationId,
(Node *) distributionColumn);
char distributionMethod = DISTRIBUTE_BY_HASH;
char *parentRelationName = generate_qualified_relation_name(parentCitusRelationId);
@ -588,7 +585,7 @@ DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationI
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(
parentCitusRelationId, partitionRelationId);
CreateDistributedTable(partitionRelationId, distributionColumn,
CreateDistributedTable(partitionRelationId, distributionColumnName,
distributionMethod, ShardCount, false,
parentRelationName, viaDeprecatedAPI);
}

View File

@ -1022,7 +1022,6 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
StringInfo insertDistributionCommand = makeStringInfo();
Oid relationId = cacheEntry->relationId;
char distributionMethod = cacheEntry->partitionMethod;
char *partitionKeyString = cacheEntry->partitionKeyString;
char *qualifiedRelationName =
generate_qualified_relation_name(relationId);
uint32 colocationId = cacheEntry->colocationId;
@ -1036,7 +1035,7 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
else
{
char *partitionKeyColumnName =
ColumnToColumnName(relationId, partitionKeyString);
ColumnToColumnName(relationId, (Node *) cacheEntry->partitionColumn);
appendStringInfo(tablePartitionKeyNameString, "%s",
quote_literal_cstr(partitionKeyColumnName));
}
@ -2445,12 +2444,10 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
distributionColumnText = PG_GETARG_TEXT_P(2);
distributionColumnString = text_to_cstring(distributionColumnText);
Relation relation = relation_open(relationId, AccessShareLock);
distributionColumnVar =
BuildDistributionKeyFromColumnName(relation, distributionColumnString);
BuildDistributionKeyFromColumnName(relationId, distributionColumnString,
AccessShareLock);
Assert(distributionColumnVar != NULL);
relation_close(relation, NoLock);
}
if (!ShouldSkipMetadataChecks())

View File

@ -1013,9 +1013,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
StringInfo errorHint = makeStringInfo();
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
distributedTableId);
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnToColumnName(distributedTableId,
partitionKeyString);
char *partitionColumnName =
ColumnToColumnName(distributedTableId,
(Node *) cacheEntry->partitionColumn);
appendStringInfo(errorHint, "Consider using an equality filter on "
"partition column \"%s\" to target a single shard.",
@ -3053,8 +3053,8 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
if (prunedShardIntervalCount != 1)
{
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnToColumnName(distributedTableId,
partitionKeyString);
char *partitionColumnName =
ColumnToColumnName(distributedTableId, stringToNode(partitionKeyString));
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
const char *targetCountType = NULL;

View File

@ -56,15 +56,12 @@ column_name_to_column(PG_FUNCTION_ARGS)
text *columnText = PG_GETARG_TEXT_P(1);
char *columnName = text_to_cstring(columnText);
Relation relation = relation_open(relationId, AccessShareLock);
Var *column = BuildDistributionKeyFromColumnName(relation, columnName);
Var *column = BuildDistributionKeyFromColumnName(relationId, columnName,
AccessShareLock);
Assert(column != NULL);
char *columnNodeString = nodeToString(column);
text *columnNodeText = cstring_to_text(columnNodeString);
relation_close(relation, AccessShareLock);
PG_RETURN_TEXT_P(columnNodeText);
}
@ -81,13 +78,10 @@ column_name_to_column_id(PG_FUNCTION_ARGS)
Oid distributedTableId = PG_GETARG_OID(0);
char *columnName = PG_GETARG_CSTRING(1);
Relation relation = relation_open(distributedTableId, AccessExclusiveLock);
Var *column = BuildDistributionKeyFromColumnName(relation, columnName);
Var *column = BuildDistributionKeyFromColumnName(distributedTableId, columnName,
AccessExclusiveLock);
Assert(column != NULL);
relation_close(relation, NoLock);
PG_RETURN_INT16((int16) column->varattno);
}
@ -107,8 +101,9 @@ column_to_column_name(PG_FUNCTION_ARGS)
text *columnNodeText = PG_GETARG_TEXT_P(1);
char *columnNodeString = text_to_cstring(columnNodeText);
Node *columnNode = stringToNode(columnNodeString);
char *columnName = ColumnToColumnName(relationId, columnNodeString);
char *columnName = ColumnToColumnName(relationId, columnNode);
text *columnText = cstring_to_text(columnName);
@ -116,53 +111,6 @@ column_to_column_name(PG_FUNCTION_ARGS)
}
/*
* FindColumnWithNameOnTargetRelation gets a source table and
* column name. The function returns the the column with the
* same name on the target table.
*
* Note that due to dropping columns, the parent's distribution key may not
* match the partition's distribution key. See issue #5123.
*
* The function throws error if the input or output is not valid or does
* not exist.
*/
Var *
FindColumnWithNameOnTargetRelation(Oid sourceRelationId, char *sourceColumnName,
Oid targetRelationId)
{
if (sourceColumnName == NULL || sourceColumnName[0] == '\0')
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("cannot find the given column on table \"%s\"",
generate_qualified_relation_name(sourceRelationId))));
}
AttrNumber attributeNumberOnTarget = get_attnum(targetRelationId, sourceColumnName);
if (attributeNumberOnTarget == InvalidAttrNumber)
{
ereport(ERROR, (errmsg("Column \"%s\" does not exist on "
"relation \"%s\"", sourceColumnName,
get_rel_name(targetRelationId))));
}
Index varNo = 1;
Oid targetTypeId = InvalidOid;
int32 targetTypMod = 0;
Oid targetCollation = InvalidOid;
Index varlevelsup = 0;
/* this function throws error in case anything goes wrong */
get_atttypetypmodcoll(targetRelationId, attributeNumberOnTarget,
&targetTypeId, &targetTypMod, &targetCollation);
Var *targetColumn =
makeVar(varNo, attributeNumberOnTarget, targetTypeId, targetTypMod,
targetCollation, varlevelsup);
return targetColumn;
}
/*
* BuildDistributionKeyFromColumnName builds a simple distribution key consisting
* only out of a reference to the column of name columnName. Errors out if the
@ -173,9 +121,18 @@ FindColumnWithNameOnTargetRelation(Oid sourceRelationId, char *sourceColumnName,
* corresponds to reference tables.
*/
Var *
BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnName)
BuildDistributionKeyFromColumnName(Oid relationId, char *columnName, LOCKMODE lockMode)
{
char *tableName = RelationGetRelationName(distributedRelation);
Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("relation does not exist")));
}
relation_close(relation, NoLock);
char *tableName = get_rel_name(relationId);
/* short circuit for reference tables */
if (columnName == NULL)
@ -187,8 +144,7 @@ BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnNam
truncate_identifier(columnName, strlen(columnName), true);
/* lookup column definition */
HeapTuple columnTuple = SearchSysCacheAttName(RelationGetRelid(distributedRelation),
columnName);
HeapTuple columnTuple = SearchSysCacheAttName(relationId, columnName);
if (!HeapTupleIsValid(columnTuple))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
@ -218,15 +174,13 @@ BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnNam
/*
* ColumnToColumnName returns the human-readable name of a column given a
* relation identifier and the column's internal textual (Var) representation.
* relation identifier and the column's internal (Var) representation.
* This function will raise an ERROR if no such column can be found or if the
* provided Var refers to a system column.
*/
char *
ColumnToColumnName(Oid relationId, char *columnNodeString)
ColumnToColumnName(Oid relationId, Node *columnNode)
{
Node *columnNode = stringToNode(columnNodeString);
if (columnNode == NULL || !IsA(columnNode, Var))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

View File

@ -19,11 +19,9 @@
/* Remaining metadata utility functions */
extern Var * FindColumnWithNameOnTargetRelation(Oid sourceRelationId,
char *sourceColumnName,
Oid targetRelationId);
extern Var * BuildDistributionKeyFromColumnName(Relation distributedRelation,
char *columnName);
extern char * ColumnToColumnName(Oid relationId, char *columnNodeString);
extern Var * BuildDistributionKeyFromColumnName(Oid relationId,
char *columnName,
LOCKMODE lockMode);
extern char * ColumnToColumnName(Oid relationId, Node *columnNode);
#endif /* DISTRIBUTION_COLUMN_H */

View File

@ -238,7 +238,7 @@ extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName,
bool viaDeprecatedAPI);

View File

@ -220,7 +220,7 @@ SELECT column_to_column_name('pg_dist_node'::regclass,'{FROMEXPR :fromlist ({RAN
ERROR: not a valid column
-- test column_name_to_column with illegal arguments
SELECT column_name_to_column(1204127312,'');
ERROR: could not open relation with OID 1204127312
ERROR: relation does not exist
SELECT column_name_to_column('customers','notacolumn');
ERROR: column "notacolumn" of relation "customers" does not exist
-- make one huge shard and manually inspect shard row