diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index eaec55c3e..e1900642d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -175,7 +175,7 @@ that are missing in earlier minor versions. ### Following our coding conventions -CircleCI will automatically reject any PRs which do not follow our coding +CI pipeline will automatically reject any PRs which do not follow our coding conventions. The easiest way to ensure your PR adheres to those conventions is to use the [citus_indent](https://github.com/citusdata/tools/tree/develop/uncrustify) tool. This tool uses `uncrustify` under the hood. diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index a81f23ad6..030dbbe78 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -209,12 +209,9 @@ static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommand static bool HasAnyGeneratedStoredColumns(Oid relationId); static List * GetNonGeneratedStoredColumnNameList(Oid relationId); static void CheckAlterDistributedTableConversionParameters(TableConversionState *con); -static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, - char *sequenceName, - char *sourceSchemaName, - char *sourceName, - char *targetSchemaName, - char *targetName); +static char * CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName, + char *qualifiedSourceName, + char *qualifiedTargetName); static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid); static char * CreateMaterializedViewDDLCommand(Oid matViewOid); static char * GetAccessMethodForMatViewIfExists(Oid viewOid); @@ -791,13 +788,15 @@ ConvertTableInternal(TableConversionState *con) justBeforeDropCommands = lappend(justBeforeDropCommands, detachFromParentCommand); } + char *qualifiedRelationName = quote_qualified_identifier(con->schemaName, + con->relationName); + if (PartitionedTable(con->relationId)) { if (!con->suppressNoticeMessages) { ereport(NOTICE, (errmsg("converting the partitions of %s", - quote_qualified_identifier(con->schemaName, - con->relationName)))); + qualifiedRelationName))); } List *partitionList = PartitionList(con->relationId); @@ -870,9 +869,7 @@ ConvertTableInternal(TableConversionState *con) if (!con->suppressNoticeMessages) { - ereport(NOTICE, (errmsg("creating a new table for %s", - quote_qualified_identifier(con->schemaName, - con->relationName)))); + ereport(NOTICE, (errmsg("creating a new table for %s", qualifiedRelationName))); } TableDDLCommand *tableCreationCommand = NULL; @@ -999,8 +996,6 @@ ConvertTableInternal(TableConversionState *con) { continue; } - char *qualifiedRelationName = quote_qualified_identifier(con->schemaName, - con->relationName); TableConversionParameters cascadeParam = { .relationId = colocatedTableId, @@ -1750,9 +1745,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) { StringInfo query = makeStringInfo(); - char *viewName = get_rel_name(matViewOid); - char *schemaName = get_namespace_name(get_rel_namespace(matViewOid)); - char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName); + char *qualifiedViewName = generate_qualified_relation_name(matViewOid); /* here we need to get the access method of the view to recreate it */ char *accessMethodName = GetAccessMethodForMatViewIfExists(matViewOid); @@ -1801,9 +1794,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, bool suppressNoticeMessages) { char *sourceName = get_rel_name(sourceId); - char *targetName = get_rel_name(targetId); - Oid schemaId = get_rel_namespace(sourceId); - char *schemaName = get_namespace_name(schemaId); + char *qualifiedSourceName = generate_qualified_relation_name(sourceId); + char *qualifiedTargetName = generate_qualified_relation_name(targetId); StringInfo query = makeStringInfo(); @@ -1811,8 +1803,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, { if (!suppressNoticeMessages) { - ereport(NOTICE, (errmsg("moving the data of %s", - quote_qualified_identifier(schemaName, sourceName)))); + ereport(NOTICE, (errmsg("moving the data of %s", qualifiedSourceName))); } if (!HasAnyGeneratedStoredColumns(sourceId)) @@ -1822,8 +1813,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, * "INSERT INTO .. SELECT *"". */ appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", - quote_qualified_identifier(schemaName, targetName), - quote_qualified_identifier(schemaName, sourceName)); + qualifiedTargetName, qualifiedSourceName); } else { @@ -1838,9 +1828,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, char *insertColumnString = StringJoin(nonStoredColumnNameList, ','); appendStringInfo(query, "INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s", - quote_qualified_identifier(schemaName, targetName), - insertColumnString, insertColumnString, - quote_qualified_identifier(schemaName, sourceName)); + qualifiedTargetName, insertColumnString, + insertColumnString, qualifiedSourceName); } ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); @@ -1864,14 +1853,11 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, */ if (ShouldSyncTableMetadata(targetId)) { - Oid sequenceSchemaOid = get_rel_namespace(sequenceOid); - char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid); - char *sequenceName = get_rel_name(sequenceOid); + char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid); char *workerChangeSequenceDependencyCommand = - CreateWorkerChangeSequenceDependencyCommand(sequenceSchemaName, - sequenceName, - schemaName, sourceName, - schemaName, targetName); + CreateWorkerChangeSequenceDependencyCommand(qualifiedSequenceName, + qualifiedSourceName, + qualifiedTargetName); SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand); } else if (ShouldSyncTableMetadata(sourceId)) @@ -1894,25 +1880,23 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, if (!suppressNoticeMessages) { - ereport(NOTICE, (errmsg("dropping the old %s", - quote_qualified_identifier(schemaName, sourceName)))); + ereport(NOTICE, (errmsg("dropping the old %s", qualifiedSourceName))); } resetStringInfo(query); appendStringInfo(query, "DROP %sTABLE %s CASCADE", IsForeignTable(sourceId) ? "FOREIGN " : "", - quote_qualified_identifier(schemaName, sourceName)); + qualifiedSourceName); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); if (!suppressNoticeMessages) { - ereport(NOTICE, (errmsg("renaming the new table to %s", - quote_qualified_identifier(schemaName, sourceName)))); + ereport(NOTICE, (errmsg("renaming the new table to %s", qualifiedSourceName))); } resetStringInfo(query); appendStringInfo(query, "ALTER TABLE %s RENAME TO %s", - quote_qualified_identifier(schemaName, targetName), + qualifiedTargetName, quote_identifier(sourceName)); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); } @@ -2172,18 +2156,13 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con) * worker_change_sequence_dependency query with the parameters. */ static char * -CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName, - char *sourceSchemaName, char *sourceName, - char *targetSchemaName, char *targetName) +CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName, + char *qualifiedSourceName, + char *qualifiedTargetName) { - char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName, - sequenceName); - char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName); - char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName); - StringInfo query = makeStringInfo(); appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)", - quote_literal_cstr(qualifiedSchemaName), + quote_literal_cstr(qualifiedSequeceName), quote_literal_cstr(qualifiedSourceName), quote_literal_cstr(qualifiedTargetName)); diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index d95cdd353..93f1e7d28 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1160,9 +1160,7 @@ DropIdentitiesOnTable(Oid relationId) if (attributeForm->attidentity) { - char *tableName = get_rel_name(relationId); - char *schemaName = get_namespace_name(get_rel_namespace(relationId)); - char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); + char *qualifiedTableName = generate_qualified_relation_name(relationId); StringInfo dropCommand = makeStringInfo(); @@ -1222,9 +1220,7 @@ DropViewsOnTable(Oid relationId) Oid viewId = InvalidOid; foreach_oid(viewId, reverseOrderedViews) { - char *viewName = get_rel_name(viewId); - char *schemaName = get_namespace_name(get_rel_namespace(viewId)); - char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName); + char *qualifiedViewName = generate_qualified_relation_name(viewId); StringInfo dropCommand = makeStringInfo(); appendStringInfo(dropCommand, "DROP %sVIEW IF EXISTS %s", diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9f3975a1e..5ec6d6dd7 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1323,10 +1323,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, { List *partitionList = PartitionList(relationId); Oid partitionRelationId = InvalidOid; - Oid namespaceId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(namespaceId); - char *relationName = get_rel_name(relationId); - char *parentRelationName = quote_qualified_identifier(schemaName, relationName); + char *parentRelationName = generate_qualified_relation_name(relationId); /* * when there are many partitions, each call to CreateDistributedTable diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 36267ff66..2ead0c58a 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -776,7 +776,7 @@ PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree) /*create extension citus version xxx*/ if (newVersionValue) { - char *newVersion = strdup(defGetString(newVersionValue)); + char *newVersion = pstrdup(defGetString(newVersionValue)); versionNumber = GetExtensionVersionNumber(newVersion); } @@ -796,7 +796,7 @@ PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree) Oid citusOid = get_extension_oid("citus", true); if (citusOid != InvalidOid) { - char *curCitusVersion = strdup(get_extension_version(citusOid)); + char *curCitusVersion = pstrdup(get_extension_version(citusOid)); int curCitusVersionNum = GetExtensionVersionNumber(curCitusVersion); if (curCitusVersionNum < 1110) { @@ -891,7 +891,7 @@ PreprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree) if (newVersionValue) { char *newVersion = defGetString(newVersionValue); - double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion)); + double newVersionNumber = GetExtensionVersionNumber(pstrdup(newVersion)); /*alter extension citus update to version >= 11.1-1, and no citus_columnar installed */ if (newVersionNumber >= 1110 && citusColumnarOid == InvalidOid) @@ -935,7 +935,7 @@ PostprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree) if (newVersionValue) { char *newVersion = defGetString(newVersionValue); - double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion)); + double newVersionNumber = GetExtensionVersionNumber(pstrdup(newVersion)); if (newVersionNumber >= 1110 && citusColumnarOid != InvalidOid) { /*upgrade citus, after "ALTER EXTENSION citus update to xxx" updates citus_columnar Y to version Z. */ diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c69e33f94..0284ea64d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2547,12 +2547,8 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu if (columnNulls[partitionColumnIndex]) { - Oid relationId = copyDest->distributedRelationId; - char *relationName = get_rel_name(relationId); - Oid schemaOid = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(schemaOid); - char *qualifiedTableName = quote_qualified_identifier(schemaName, - relationName); + char *qualifiedTableName = generate_qualified_relation_name( + copyDest->distributedRelationId); ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("the partition column of table %s cannot be NULL", diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index c2155383a..68af4b7b5 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -92,7 +92,7 @@ #define START_MANAGEMENT_TRANSACTION \ "SELECT citus_internal.start_management_transaction('%lu')" #define MARK_OBJECT_DISTRIBUTED \ - "SELECT citus_internal.mark_object_distributed(%d, %s, %d)" + "SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -1636,7 +1636,8 @@ RunPostprocessMainDBCommand(Node *parsetree) MARK_OBJECT_DISTRIBUTED, AuthIdRelationId, quote_literal_cstr(createRoleStmt->role), - roleOid); + roleOid, + quote_literal_cstr(CurrentUserName())); RunCitusMainDBQuery(mainDBQuery->data); } } diff --git a/src/backend/distributed/commands/view.c b/src/backend/distributed/commands/view.c index 0c39be4ca..9689b9267 100644 --- a/src/backend/distributed/commands/view.c +++ b/src/backend/distributed/commands/view.c @@ -392,9 +392,7 @@ CreateViewDDLCommand(Oid viewOid) static void AppendQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid) { - char *viewName = get_rel_name(viewOid); - char *schemaName = get_namespace_name(get_rel_namespace(viewOid)); - char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName); + char *qualifiedViewName = generate_qualified_relation_name(viewOid); appendStringInfo(buf, "%s ", qualifiedViewName); } diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 57069f698..c52254f9c 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -123,6 +123,10 @@ AddConnParam(const char *keyword, const char *value) errmsg("ConnParams arrays bound check failed"))); } + /* + * Don't use pstrdup here to avoid being tied to a memory context, we free + * these later using ResetConnParams + */ ConnParams.keywords[ConnParams.size] = strdup(keyword); ConnParams.values[ConnParams.size] = strdup(value); ConnParams.size++; diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c index 01b74eab1..88948cff5 100644 --- a/src/backend/distributed/deparser/ruleutils_14.c +++ b/src/backend/distributed/deparser/ruleutils_14.c @@ -1526,8 +1526,15 @@ set_join_column_names(deparse_namespace *dpns, RangeTblEntry *rte, /* Assert we processed the right number of columns */ #ifdef USE_ASSERT_CHECKING - while (i < colinfo->num_cols && colinfo->colnames[i] == NULL) - i++; + for (int col_index = 0; col_index < colinfo->num_cols; col_index++) + { + /* + * In the above processing-loops, "i" advances only if + * the column is not new, check if this is a new column. + */ + if (colinfo->is_new_col[col_index]) + i++; + } Assert(i == colinfo->num_cols); Assert(j == nnewcolumns); #endif diff --git a/src/backend/distributed/deparser/ruleutils_16.c b/src/backend/distributed/deparser/ruleutils_16.c index 10373e487..7f2a41d75 100644 --- a/src/backend/distributed/deparser/ruleutils_16.c +++ b/src/backend/distributed/deparser/ruleutils_16.c @@ -1580,8 +1580,15 @@ set_join_column_names(deparse_namespace *dpns, RangeTblEntry *rte, /* Assert we processed the right number of columns */ #ifdef USE_ASSERT_CHECKING - while (i < colinfo->num_cols && colinfo->colnames[i] == NULL) - i++; + for (int col_index = 0; col_index < colinfo->num_cols; col_index++) + { + /* + * In the above processing-loops, "i" advances only if + * the column is not new, check if this is a new column. + */ + if (colinfo->is_new_col[col_index]) + i++; + } Assert(i == colinfo->num_cols); Assert(j == nnewcolumns); #endif diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1b0277f2e..e912f418d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -727,6 +727,11 @@ static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time end static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); static void SetAttributeInputMetadata(DistributedExecution *execution, ShardCommandExecution *shardCommandExecution); +static ExecutionParams * CreateDefaultExecutionParams(RowModifyLevel modLevel, + List *taskList, + TupleDestination *tupleDest, + bool expectResults, + ParamListInfo paramListInfo); /* @@ -1013,14 +1018,14 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, /* - * ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses - * bind params from executor state, and with defaults for some of the arguments. + * CreateDefaultExecutionParams returns execution params based on given (possibly null) + * bind params (presumably from executor state) with defaults for some of the arguments. */ -uint64 -ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList, - TupleDestination *tupleDest, - bool expectResults, - ParamListInfo paramListInfo) +static ExecutionParams * +CreateDefaultExecutionParams(RowModifyLevel modLevel, List *taskList, + TupleDestination *tupleDest, + bool expectResults, + ParamListInfo paramListInfo) { int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool localExecutionSupported = true; @@ -1034,6 +1039,24 @@ ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList, executionParams->tupleDestination = tupleDest; executionParams->paramListInfo = paramListInfo; + return executionParams; +} + + +/* + * ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses + * bind params from executor state, and with defaults for some of the arguments. + */ +uint64 +ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList, + TupleDestination *tupleDest, + bool expectResults, + ParamListInfo paramListInfo) +{ + ExecutionParams *executionParams = CreateDefaultExecutionParams(modLevel, taskList, + tupleDest, + expectResults, + paramListInfo); return ExecuteTaskListExtended(executionParams); } @@ -1047,17 +1070,11 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList, TupleDestination *tupleDest, bool expectResults) { - int targetPoolSize = MaxAdaptiveExecutorPoolSize; - bool localExecutionSupported = true; - ExecutionParams *executionParams = CreateBasicExecutionParams( - modLevel, taskList, targetPoolSize, localExecutionSupported - ); - - executionParams->xactProperties = DecideTransactionPropertiesForTaskList( - modLevel, taskList, false); - executionParams->expectResults = expectResults; - executionParams->tupleDestination = tupleDest; - + ParamListInfo paramListInfo = NULL; + ExecutionParams *executionParams = CreateDefaultExecutionParams(modLevel, taskList, + tupleDest, + expectResults, + paramListInfo); return ExecuteTaskListExtended(executionParams); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f5fbb3f78..a8dc1fa5a 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -143,15 +143,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node) targetRelation->partitionColumn); if (distributionColumnIndex == -1) { - char *relationName = get_rel_name(targetRelationId); - Oid schemaOid = get_rel_namespace(targetRelationId); - char *schemaName = get_namespace_name(schemaOid); - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg( "the partition column of table %s should have a value", - quote_qualified_identifier(schemaName, - relationName)))); + generate_qualified_relation_name(targetRelationId)))); } TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList, diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 1d07be8c3..007d07bdc 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -67,7 +67,8 @@ PG_FUNCTION_INFO_V1(master_unmark_object_distributed); /* * mark_object_distributed adds an object to pg_dist_object - * in all of the nodes. + * in all of the nodes, for the connections to the other nodes this function + * uses the user passed. */ Datum mark_object_distributed(PG_FUNCTION_ARGS) @@ -81,6 +82,8 @@ mark_object_distributed(PG_FUNCTION_ARGS) Oid objectId = PG_GETARG_OID(2); ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*objectAddress, classId, objectId); + text *connectionUserText = PG_GETARG_TEXT_P(3); + char *connectionUser = text_to_cstring(connectionUserText); /* * This function is called when a query is run from a Citus non-main database. @@ -88,7 +91,8 @@ mark_object_distributed(PG_FUNCTION_ARGS) * 2PC still works. */ bool useConnectionForLocalQuery = true; - MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery); + MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery, + connectionUser); PG_RETURN_VOID(); } @@ -193,7 +197,8 @@ void MarkObjectDistributed(const ObjectAddress *distAddress) { bool useConnectionForLocalQuery = false; - MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery); + MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery, + CurrentUserName()); } @@ -204,7 +209,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress) */ void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName, - bool useConnectionForLocalQuery) + bool useConnectionForLocalQuery, char *connectionUser) { if (!CitusHasBeenLoaded()) { @@ -234,7 +239,8 @@ MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName { char *workerPgDistObjectUpdateCommand = CreatePgDistObjectEntryCommand(distAddress, objectName); - SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); + SendCommandToRemoteMetadataNodesParams(workerPgDistObjectUpdateCommand, + connectionUser, 0, NULL, NULL); } } diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 7d6747caf..805ef39d7 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1945,11 +1945,7 @@ ConstructQualifiedShardName(ShardInterval *shardInterval) static List * RecreateTableDDLCommandList(Oid relationId) { - const char *relationName = get_rel_name(relationId); - Oid relationSchemaId = get_rel_namespace(relationId); - const char *relationSchemaName = get_namespace_name(relationSchemaId); - const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName, - relationName); + const char *qualifiedRelationName = generate_qualified_relation_name(relationId); StringInfo dropCommand = makeStringInfo(); diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index 827a0286c..bef91618e 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -433,7 +433,7 @@ CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index rteIndex, attributeTuple->atttypmod, attributeTuple->attcollation, 0); TargetEntry *targetEntry = makeTargetEntry((Expr *) targetColumn, resno, - strdup(attributeTuple->attname.data), false); + pstrdup(attributeTuple->attname.data), false); return targetEntry; } @@ -449,7 +449,7 @@ CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int resno) attributeTuple->attcollation); char *resName = attributeTuple->attname.data; TargetEntry *targetEntry = - makeTargetEntry(nullExpr, resno, strdup(resName), false); + makeTargetEntry(nullExpr, resno, pstrdup(resName), false); return targetEntry; } diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index f889a0095..20d85444f 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -15,7 +15,7 @@ DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user( ); DROP FUNCTION citus_internal.mark_object_distributed( - classId Oid, objectName text, objectId Oid + classId Oid, objectName text, objectId Oid, connectionUser text ); DROP FUNCTION citus_internal.commit_management_command_2pc(); diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql index ee2c5e7e8..25d35c028 100644 --- a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql @@ -1,7 +1,7 @@ -CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) RETURNS VOID LANGUAGE C AS 'MODULE_PATHNAME', $$mark_object_distributed$$; -COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql index ee2c5e7e8..25d35c028 100644 --- a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql @@ -1,7 +1,7 @@ -CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) RETURNS VOID LANGUAGE C AS 'MODULE_PATHNAME', $$mark_object_distributed$$; -COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 9c8563de0..c6fcee107 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -36,10 +36,6 @@ #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" -static void SendCommandToRemoteMetadataNodesParams(const char *command, - const char *user, int parameterCount, - const Oid *parameterTypes, - const char *const *parameterValues); static void SendBareCommandListToMetadataNodesInternal(List *commandList, TargetWorkerSet targetWorkerSet); static void SendCommandToMetadataWorkersParams(const char *command, @@ -209,7 +205,7 @@ SendCommandListToRemoteNodesWithMetadata(List *commands) * SendCommandToWorkersParamsInternal() that can be used to send commands * to remote metadata nodes. */ -static void +void SendCommandToRemoteMetadataNodesParams(const char *command, const char *user, int parameterCount, const Oid *parameterTypes, diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 6d7b5326a..280de4493 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -170,14 +170,10 @@ WorkerDropDistributedTable(Oid relationId) */ if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL)) { - char *relName = get_rel_name(relationId); - Oid schemaId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(schemaId); - StringInfo dropCommand = makeStringInfo(); appendStringInfo(dropCommand, "DROP%sTABLE %s CASCADE", IsForeignTable(relationId) ? " FOREIGN " : " ", - quote_qualified_identifier(schemaName, relName)); + generate_qualified_relation_name(relationId)); Node *dropCommandNode = ParseTreeNode(dropCommand->data); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 888133a89..16df367aa 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -92,38 +92,21 @@ CitusNodeTagI(Node *node) return ((CitusNode*)(node))->citus_tag; } -/* - * Postgres's nodes/nodes.h has more information on why we do this. - */ -#ifdef __GNUC__ /* Citus variant of newNode(), don't use directly. */ -#define CitusNewNode(size, tag) \ -({ CitusNode *_result; \ - AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \ - _result = (CitusNode *) palloc0fast(size); \ - _result->extensible.type = T_ExtensibleNode; \ - _result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \ - _result->citus_tag =(int) (tag); \ - _result; \ -}) +static inline CitusNode * +CitusNewNode(size_t size, CitusNodeTag tag) +{ + CitusNode *result; -#else - -extern CitusNode *newCitusNodeMacroHolder; - -#define CitusNewNode(size, tag) \ -( \ - AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \ - newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \ - newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \ - newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \ - newCitusNodeMacroHolder->citus_tag =(int) (tag), \ - newCitusNodeMacroHolder \ -) - -#endif + Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */ + result = (CitusNode *) palloc0(size); + result->extensible.type = T_ExtensibleNode; + result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; + result->citus_tag = (int) (tag); + return result; +} /* * IsA equivalent that compares node tags, including Citus-specific nodes. diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 13f38178b..e98e6ee86 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -24,7 +24,8 @@ extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name, - bool useConnectionForLocalQuery); + bool useConnectionForLocalQuery, + char *connectionUser); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index b9a855828..1b3809a0e 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -68,6 +68,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser, const char *command); extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser, const char *command); +extern void SendCommandToRemoteMetadataNodesParams(const char *command, + const char *user, int parameterCount, + const Oid *parameterTypes, + const char *const *parameterValues); extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, diff --git a/src/test/regress/citus_tests/test/test_other_databases.py b/src/test/regress/citus_tests/test/test_other_databases.py index 925b065a7..494301692 100644 --- a/src/test/regress/citus_tests/test/test_other_databases.py +++ b/src/test/regress/citus_tests/test/test_other_databases.py @@ -22,7 +22,7 @@ def test_main_commited_outer_not_yet(cluster): "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" ) cur2.execute( - "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" + "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123, 'postgres')" ) cur2.execute("COMMIT") @@ -133,7 +133,7 @@ def test_main_commited_outer_aborted(cluster): "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')" ) cur2.execute( - "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)" + "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321, 'postgres')" ) cur2.execute("COMMIT") diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 60e283800..57dbe3e75 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1424,7 +1424,7 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- | function citus_internal.commit_management_command_2pc() void | function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void - | function citus_internal.mark_object_distributed(oid,text,oid) void + | function citus_internal.mark_object_distributed(oid,text,oid,text) void | function citus_internal.start_management_transaction(xid8) void | function citus_internal_acquire_citus_advisory_object_class_lock(integer,cstring) void | function citus_internal_database_command(text) void diff --git a/src/test/regress/expected/other_databases.out b/src/test/regress/expected/other_databases.out index 1b81af3b7..9e170861e 100644 --- a/src/test/regress/expected/other_databases.out +++ b/src/test/regress/expected/other_databases.out @@ -71,6 +71,7 @@ SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerou ERROR: operation is not allowed HINT: Run the command with a superuser. \c other_db1 +SET ROLE nonsuperuser; CREATE USER other_db_user9; RESET ROLE; \c regression diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 97e5c0928..9bd542f05 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -59,7 +59,7 @@ ORDER BY 1; function citus_internal.commit_management_command_2pc() function citus_internal.execute_command_on_remote_nodes_as_user(text,text) function citus_internal.find_groupid_for_node(text,integer) - function citus_internal.mark_object_distributed(oid,text,oid) + function citus_internal.mark_object_distributed(oid,text,oid,text) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func() diff --git a/src/test/regress/sql/other_databases.sql b/src/test/regress/sql/other_databases.sql index 629f74f45..563793518 100644 --- a/src/test/regress/sql/other_databases.sql +++ b/src/test/regress/sql/other_databases.sql @@ -51,6 +51,7 @@ SET ROLE nonsuperuser; SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); \c other_db1 +SET ROLE nonsuperuser; CREATE USER other_db_user9; RESET ROLE;