Generate qualified relation name (#7427)

This change refactors the code by using generate_qualified_relation_name
from id instead of using a sequence of functions to generate the
relation name.


Fixes #6602
pull/7435/head
eaydingol 2024-01-22 17:32:49 +03:00 committed by GitHub
parent 4b295cc857
commit ee11492a0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 36 additions and 83 deletions

View File

@ -209,12 +209,9 @@ static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommand
static bool HasAnyGeneratedStoredColumns(Oid relationId); static bool HasAnyGeneratedStoredColumns(Oid relationId);
static List * GetNonGeneratedStoredColumnNameList(Oid relationId); static List * GetNonGeneratedStoredColumnNameList(Oid relationId);
static void CheckAlterDistributedTableConversionParameters(TableConversionState *con); static void CheckAlterDistributedTableConversionParameters(TableConversionState *con);
static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, static char * CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName,
char *sequenceName, char *qualifiedSourceName,
char *sourceSchemaName, char *qualifiedTargetName);
char *sourceName,
char *targetSchemaName,
char *targetName);
static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid); static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid);
static char * CreateMaterializedViewDDLCommand(Oid matViewOid); static char * CreateMaterializedViewDDLCommand(Oid matViewOid);
static char * GetAccessMethodForMatViewIfExists(Oid viewOid); static char * GetAccessMethodForMatViewIfExists(Oid viewOid);
@ -791,13 +788,15 @@ ConvertTableInternal(TableConversionState *con)
justBeforeDropCommands = lappend(justBeforeDropCommands, detachFromParentCommand); justBeforeDropCommands = lappend(justBeforeDropCommands, detachFromParentCommand);
} }
char *qualifiedRelationName = quote_qualified_identifier(con->schemaName,
con->relationName);
if (PartitionedTable(con->relationId)) if (PartitionedTable(con->relationId))
{ {
if (!con->suppressNoticeMessages) if (!con->suppressNoticeMessages)
{ {
ereport(NOTICE, (errmsg("converting the partitions of %s", ereport(NOTICE, (errmsg("converting the partitions of %s",
quote_qualified_identifier(con->schemaName, qualifiedRelationName)));
con->relationName))));
} }
List *partitionList = PartitionList(con->relationId); List *partitionList = PartitionList(con->relationId);
@ -870,9 +869,7 @@ ConvertTableInternal(TableConversionState *con)
if (!con->suppressNoticeMessages) if (!con->suppressNoticeMessages)
{ {
ereport(NOTICE, (errmsg("creating a new table for %s", ereport(NOTICE, (errmsg("creating a new table for %s", qualifiedRelationName)));
quote_qualified_identifier(con->schemaName,
con->relationName))));
} }
TableDDLCommand *tableCreationCommand = NULL; TableDDLCommand *tableCreationCommand = NULL;
@ -999,8 +996,6 @@ ConvertTableInternal(TableConversionState *con)
{ {
continue; continue;
} }
char *qualifiedRelationName = quote_qualified_identifier(con->schemaName,
con->relationName);
TableConversionParameters cascadeParam = { TableConversionParameters cascadeParam = {
.relationId = colocatedTableId, .relationId = colocatedTableId,
@ -1750,9 +1745,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
{ {
StringInfo query = makeStringInfo(); StringInfo query = makeStringInfo();
char *viewName = get_rel_name(matViewOid); char *qualifiedViewName = generate_qualified_relation_name(matViewOid);
char *schemaName = get_namespace_name(get_rel_namespace(matViewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
/* here we need to get the access method of the view to recreate it */ /* here we need to get the access method of the view to recreate it */
char *accessMethodName = GetAccessMethodForMatViewIfExists(matViewOid); char *accessMethodName = GetAccessMethodForMatViewIfExists(matViewOid);
@ -1801,9 +1794,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
bool suppressNoticeMessages) bool suppressNoticeMessages)
{ {
char *sourceName = get_rel_name(sourceId); char *sourceName = get_rel_name(sourceId);
char *targetName = get_rel_name(targetId); char *qualifiedSourceName = generate_qualified_relation_name(sourceId);
Oid schemaId = get_rel_namespace(sourceId); char *qualifiedTargetName = generate_qualified_relation_name(targetId);
char *schemaName = get_namespace_name(schemaId);
StringInfo query = makeStringInfo(); StringInfo query = makeStringInfo();
@ -1811,8 +1803,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
{ {
if (!suppressNoticeMessages) if (!suppressNoticeMessages)
{ {
ereport(NOTICE, (errmsg("moving the data of %s", ereport(NOTICE, (errmsg("moving the data of %s", qualifiedSourceName)));
quote_qualified_identifier(schemaName, sourceName))));
} }
if (!HasAnyGeneratedStoredColumns(sourceId)) if (!HasAnyGeneratedStoredColumns(sourceId))
@ -1822,8 +1813,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
* "INSERT INTO .. SELECT *"". * "INSERT INTO .. SELECT *"".
*/ */
appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s",
quote_qualified_identifier(schemaName, targetName), qualifiedTargetName, qualifiedSourceName);
quote_qualified_identifier(schemaName, sourceName));
} }
else else
{ {
@ -1838,9 +1828,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
char *insertColumnString = StringJoin(nonStoredColumnNameList, ','); char *insertColumnString = StringJoin(nonStoredColumnNameList, ',');
appendStringInfo(query, appendStringInfo(query,
"INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s", "INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s",
quote_qualified_identifier(schemaName, targetName), qualifiedTargetName, insertColumnString,
insertColumnString, insertColumnString, insertColumnString, qualifiedSourceName);
quote_qualified_identifier(schemaName, sourceName));
} }
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
@ -1864,14 +1853,11 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
*/ */
if (ShouldSyncTableMetadata(targetId)) if (ShouldSyncTableMetadata(targetId))
{ {
Oid sequenceSchemaOid = get_rel_namespace(sequenceOid); char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid);
char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid);
char *sequenceName = get_rel_name(sequenceOid);
char *workerChangeSequenceDependencyCommand = char *workerChangeSequenceDependencyCommand =
CreateWorkerChangeSequenceDependencyCommand(sequenceSchemaName, CreateWorkerChangeSequenceDependencyCommand(qualifiedSequenceName,
sequenceName, qualifiedSourceName,
schemaName, sourceName, qualifiedTargetName);
schemaName, targetName);
SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand); SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand);
} }
else if (ShouldSyncTableMetadata(sourceId)) else if (ShouldSyncTableMetadata(sourceId))
@ -1894,25 +1880,23 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
if (!suppressNoticeMessages) if (!suppressNoticeMessages)
{ {
ereport(NOTICE, (errmsg("dropping the old %s", ereport(NOTICE, (errmsg("dropping the old %s", qualifiedSourceName)));
quote_qualified_identifier(schemaName, sourceName))));
} }
resetStringInfo(query); resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE", appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "", IsForeignTable(sourceId) ? "FOREIGN " : "",
quote_qualified_identifier(schemaName, sourceName)); qualifiedSourceName);
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
if (!suppressNoticeMessages) if (!suppressNoticeMessages)
{ {
ereport(NOTICE, (errmsg("renaming the new table to %s", ereport(NOTICE, (errmsg("renaming the new table to %s", qualifiedSourceName)));
quote_qualified_identifier(schemaName, sourceName))));
} }
resetStringInfo(query); resetStringInfo(query);
appendStringInfo(query, "ALTER TABLE %s RENAME TO %s", appendStringInfo(query, "ALTER TABLE %s RENAME TO %s",
quote_qualified_identifier(schemaName, targetName), qualifiedTargetName,
quote_identifier(sourceName)); quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
} }
@ -2172,18 +2156,13 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con)
* worker_change_sequence_dependency query with the parameters. * worker_change_sequence_dependency query with the parameters.
*/ */
static char * static char *
CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName, CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName,
char *sourceSchemaName, char *sourceName, char *qualifiedSourceName,
char *targetSchemaName, char *targetName) char *qualifiedTargetName)
{ {
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
sequenceName);
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
StringInfo query = makeStringInfo(); StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)", appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
quote_literal_cstr(qualifiedSchemaName), quote_literal_cstr(qualifiedSequeceName),
quote_literal_cstr(qualifiedSourceName), quote_literal_cstr(qualifiedSourceName),
quote_literal_cstr(qualifiedTargetName)); quote_literal_cstr(qualifiedTargetName));

View File

@ -1160,9 +1160,7 @@ DropIdentitiesOnTable(Oid relationId)
if (attributeForm->attidentity) if (attributeForm->attidentity)
{ {
char *tableName = get_rel_name(relationId); char *qualifiedTableName = generate_qualified_relation_name(relationId);
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
StringInfo dropCommand = makeStringInfo(); StringInfo dropCommand = makeStringInfo();
@ -1222,9 +1220,7 @@ DropViewsOnTable(Oid relationId)
Oid viewId = InvalidOid; Oid viewId = InvalidOid;
foreach_oid(viewId, reverseOrderedViews) foreach_oid(viewId, reverseOrderedViews)
{ {
char *viewName = get_rel_name(viewId); char *qualifiedViewName = generate_qualified_relation_name(viewId);
char *schemaName = get_namespace_name(get_rel_namespace(viewId));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
StringInfo dropCommand = makeStringInfo(); StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "DROP %sVIEW IF EXISTS %s", appendStringInfo(dropCommand, "DROP %sVIEW IF EXISTS %s",

View File

@ -1323,10 +1323,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{ {
List *partitionList = PartitionList(relationId); List *partitionList = PartitionList(relationId);
Oid partitionRelationId = InvalidOid; Oid partitionRelationId = InvalidOid;
Oid namespaceId = get_rel_namespace(relationId); char *parentRelationName = generate_qualified_relation_name(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
/* /*
* when there are many partitions, each call to CreateDistributedTable * when there are many partitions, each call to CreateDistributedTable

View File

@ -2547,12 +2547,8 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
if (columnNulls[partitionColumnIndex]) if (columnNulls[partitionColumnIndex])
{ {
Oid relationId = copyDest->distributedRelationId; char *qualifiedTableName = generate_qualified_relation_name(
char *relationName = get_rel_name(relationId); copyDest->distributedRelationId);
Oid schemaOid = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaOid);
char *qualifiedTableName = quote_qualified_identifier(schemaName,
relationName);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column of table %s cannot be NULL", errmsg("the partition column of table %s cannot be NULL",

View File

@ -392,9 +392,7 @@ CreateViewDDLCommand(Oid viewOid)
static void static void
AppendQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid) AppendQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid)
{ {
char *viewName = get_rel_name(viewOid); char *qualifiedViewName = generate_qualified_relation_name(viewOid);
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
appendStringInfo(buf, "%s ", qualifiedViewName); appendStringInfo(buf, "%s ", qualifiedViewName);
} }

View File

@ -143,15 +143,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
targetRelation->partitionColumn); targetRelation->partitionColumn);
if (distributionColumnIndex == -1) if (distributionColumnIndex == -1)
{ {
char *relationName = get_rel_name(targetRelationId);
Oid schemaOid = get_rel_namespace(targetRelationId);
char *schemaName = get_namespace_name(schemaOid);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg( errmsg(
"the partition column of table %s should have a value", "the partition column of table %s should have a value",
quote_qualified_identifier(schemaName, generate_qualified_relation_name(targetRelationId))));
relationName))));
} }
TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList, TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList,

View File

@ -1945,11 +1945,7 @@ ConstructQualifiedShardName(ShardInterval *shardInterval)
static List * static List *
RecreateTableDDLCommandList(Oid relationId) RecreateTableDDLCommandList(Oid relationId)
{ {
const char *relationName = get_rel_name(relationId); const char *qualifiedRelationName = generate_qualified_relation_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
const char *relationSchemaName = get_namespace_name(relationSchemaId);
const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName,
relationName);
StringInfo dropCommand = makeStringInfo(); StringInfo dropCommand = makeStringInfo();

View File

@ -170,14 +170,10 @@ WorkerDropDistributedTable(Oid relationId)
*/ */
if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL)) if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL))
{ {
char *relName = get_rel_name(relationId);
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
StringInfo dropCommand = makeStringInfo(); StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "DROP%sTABLE %s CASCADE", appendStringInfo(dropCommand, "DROP%sTABLE %s CASCADE",
IsForeignTable(relationId) ? " FOREIGN " : " ", IsForeignTable(relationId) ? " FOREIGN " : " ",
quote_qualified_identifier(schemaName, relName)); generate_qualified_relation_name(relationId));
Node *dropCommandNode = ParseTreeNode(dropCommand->data); Node *dropCommandNode = ParseTreeNode(dropCommand->data);