From 4b295cc857bda0a90ce0c3bed6a26bd4a7e448c9 Mon Sep 17 00:00:00 2001 From: zhjwpku Date: Mon, 22 Jan 2024 21:55:14 +0800 Subject: [PATCH 1/8] Simplify CitusNewNode (#7434) postgres refactored newNode() in PG 17, the main point for doing this is the original tricks is no longer neccessary for modern compilers[1]. This does the same for Citus. This should have no backward compatibility issues since it just replaces palloc0fast with palloc0. This is good for forward compatibility since palloc0fast no longer exists in PG 17. [1] https://www.postgresql.org/message-id/b51f1fa7-7e6a-4ecc-936d-90a8a1659e7c@iki.fi --- src/include/distributed/citus_nodes.h | 39 ++++++++------------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 888133a89..16df367aa 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -92,38 +92,21 @@ CitusNodeTagI(Node *node) return ((CitusNode*)(node))->citus_tag; } -/* - * Postgres's nodes/nodes.h has more information on why we do this. - */ -#ifdef __GNUC__ /* Citus variant of newNode(), don't use directly. */ -#define CitusNewNode(size, tag) \ -({ CitusNode *_result; \ - AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \ - _result = (CitusNode *) palloc0fast(size); \ - _result->extensible.type = T_ExtensibleNode; \ - _result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \ - _result->citus_tag =(int) (tag); \ - _result; \ -}) +static inline CitusNode * +CitusNewNode(size_t size, CitusNodeTag tag) +{ + CitusNode *result; -#else - -extern CitusNode *newCitusNodeMacroHolder; - -#define CitusNewNode(size, tag) \ -( \ - AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \ - newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \ - newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \ - newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \ - newCitusNodeMacroHolder->citus_tag =(int) (tag), \ - newCitusNodeMacroHolder \ -) - -#endif + Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */ + result = (CitusNode *) palloc0(size); + result->extensible.type = T_ExtensibleNode; + result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; + result->citus_tag = (int) (tag); + return result; +} /* * IsA equivalent that compares node tags, including Citus-specific nodes. From ee11492a0ed080b4c669460ae523cb437ab2faeb Mon Sep 17 00:00:00 2001 From: eaydingol <60466783+eaydingol@users.noreply.github.com> Date: Mon, 22 Jan 2024 17:32:49 +0300 Subject: [PATCH 2/8] Generate qualified relation name (#7427) This change refactors the code by using generate_qualified_relation_name from id instead of using a sequence of functions to generate the relation name. Fixes #6602 --- .../distributed/commands/alter_table.c | 75 +++++++------------ .../citus_add_local_table_to_metadata.c | 8 +- .../commands/create_distributed_table.c | 5 +- src/backend/distributed/commands/multi_copy.c | 8 +- src/backend/distributed/commands/view.c | 4 +- .../executor/insert_select_executor.c | 7 +- .../distributed/operations/shard_transfer.c | 6 +- .../distributed/worker/worker_drop_protocol.c | 6 +- 8 files changed, 36 insertions(+), 83 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index a81f23ad6..030dbbe78 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -209,12 +209,9 @@ static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommand static bool HasAnyGeneratedStoredColumns(Oid relationId); static List * GetNonGeneratedStoredColumnNameList(Oid relationId); static void CheckAlterDistributedTableConversionParameters(TableConversionState *con); -static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, - char *sequenceName, - char *sourceSchemaName, - char *sourceName, - char *targetSchemaName, - char *targetName); +static char * CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName, + char *qualifiedSourceName, + char *qualifiedTargetName); static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid); static char * CreateMaterializedViewDDLCommand(Oid matViewOid); static char * GetAccessMethodForMatViewIfExists(Oid viewOid); @@ -791,13 +788,15 @@ ConvertTableInternal(TableConversionState *con) justBeforeDropCommands = lappend(justBeforeDropCommands, detachFromParentCommand); } + char *qualifiedRelationName = quote_qualified_identifier(con->schemaName, + con->relationName); + if (PartitionedTable(con->relationId)) { if (!con->suppressNoticeMessages) { ereport(NOTICE, (errmsg("converting the partitions of %s", - quote_qualified_identifier(con->schemaName, - con->relationName)))); + qualifiedRelationName))); } List *partitionList = PartitionList(con->relationId); @@ -870,9 +869,7 @@ ConvertTableInternal(TableConversionState *con) if (!con->suppressNoticeMessages) { - ereport(NOTICE, (errmsg("creating a new table for %s", - quote_qualified_identifier(con->schemaName, - con->relationName)))); + ereport(NOTICE, (errmsg("creating a new table for %s", qualifiedRelationName))); } TableDDLCommand *tableCreationCommand = NULL; @@ -999,8 +996,6 @@ ConvertTableInternal(TableConversionState *con) { continue; } - char *qualifiedRelationName = quote_qualified_identifier(con->schemaName, - con->relationName); TableConversionParameters cascadeParam = { .relationId = colocatedTableId, @@ -1750,9 +1745,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) { StringInfo query = makeStringInfo(); - char *viewName = get_rel_name(matViewOid); - char *schemaName = get_namespace_name(get_rel_namespace(matViewOid)); - char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName); + char *qualifiedViewName = generate_qualified_relation_name(matViewOid); /* here we need to get the access method of the view to recreate it */ char *accessMethodName = GetAccessMethodForMatViewIfExists(matViewOid); @@ -1801,9 +1794,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, bool suppressNoticeMessages) { char *sourceName = get_rel_name(sourceId); - char *targetName = get_rel_name(targetId); - Oid schemaId = get_rel_namespace(sourceId); - char *schemaName = get_namespace_name(schemaId); + char *qualifiedSourceName = generate_qualified_relation_name(sourceId); + char *qualifiedTargetName = generate_qualified_relation_name(targetId); StringInfo query = makeStringInfo(); @@ -1811,8 +1803,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, { if (!suppressNoticeMessages) { - ereport(NOTICE, (errmsg("moving the data of %s", - quote_qualified_identifier(schemaName, sourceName)))); + ereport(NOTICE, (errmsg("moving the data of %s", qualifiedSourceName))); } if (!HasAnyGeneratedStoredColumns(sourceId)) @@ -1822,8 +1813,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, * "INSERT INTO .. SELECT *"". */ appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", - quote_qualified_identifier(schemaName, targetName), - quote_qualified_identifier(schemaName, sourceName)); + qualifiedTargetName, qualifiedSourceName); } else { @@ -1838,9 +1828,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, char *insertColumnString = StringJoin(nonStoredColumnNameList, ','); appendStringInfo(query, "INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s", - quote_qualified_identifier(schemaName, targetName), - insertColumnString, insertColumnString, - quote_qualified_identifier(schemaName, sourceName)); + qualifiedTargetName, insertColumnString, + insertColumnString, qualifiedSourceName); } ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); @@ -1864,14 +1853,11 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, */ if (ShouldSyncTableMetadata(targetId)) { - Oid sequenceSchemaOid = get_rel_namespace(sequenceOid); - char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid); - char *sequenceName = get_rel_name(sequenceOid); + char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid); char *workerChangeSequenceDependencyCommand = - CreateWorkerChangeSequenceDependencyCommand(sequenceSchemaName, - sequenceName, - schemaName, sourceName, - schemaName, targetName); + CreateWorkerChangeSequenceDependencyCommand(qualifiedSequenceName, + qualifiedSourceName, + qualifiedTargetName); SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand); } else if (ShouldSyncTableMetadata(sourceId)) @@ -1894,25 +1880,23 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, if (!suppressNoticeMessages) { - ereport(NOTICE, (errmsg("dropping the old %s", - quote_qualified_identifier(schemaName, sourceName)))); + ereport(NOTICE, (errmsg("dropping the old %s", qualifiedSourceName))); } resetStringInfo(query); appendStringInfo(query, "DROP %sTABLE %s CASCADE", IsForeignTable(sourceId) ? "FOREIGN " : "", - quote_qualified_identifier(schemaName, sourceName)); + qualifiedSourceName); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); if (!suppressNoticeMessages) { - ereport(NOTICE, (errmsg("renaming the new table to %s", - quote_qualified_identifier(schemaName, sourceName)))); + ereport(NOTICE, (errmsg("renaming the new table to %s", qualifiedSourceName))); } resetStringInfo(query); appendStringInfo(query, "ALTER TABLE %s RENAME TO %s", - quote_qualified_identifier(schemaName, targetName), + qualifiedTargetName, quote_identifier(sourceName)); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); } @@ -2172,18 +2156,13 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con) * worker_change_sequence_dependency query with the parameters. */ static char * -CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName, - char *sourceSchemaName, char *sourceName, - char *targetSchemaName, char *targetName) +CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName, + char *qualifiedSourceName, + char *qualifiedTargetName) { - char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName, - sequenceName); - char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName); - char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName); - StringInfo query = makeStringInfo(); appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)", - quote_literal_cstr(qualifiedSchemaName), + quote_literal_cstr(qualifiedSequeceName), quote_literal_cstr(qualifiedSourceName), quote_literal_cstr(qualifiedTargetName)); diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index d95cdd353..93f1e7d28 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1160,9 +1160,7 @@ DropIdentitiesOnTable(Oid relationId) if (attributeForm->attidentity) { - char *tableName = get_rel_name(relationId); - char *schemaName = get_namespace_name(get_rel_namespace(relationId)); - char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); + char *qualifiedTableName = generate_qualified_relation_name(relationId); StringInfo dropCommand = makeStringInfo(); @@ -1222,9 +1220,7 @@ DropViewsOnTable(Oid relationId) Oid viewId = InvalidOid; foreach_oid(viewId, reverseOrderedViews) { - char *viewName = get_rel_name(viewId); - char *schemaName = get_namespace_name(get_rel_namespace(viewId)); - char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName); + char *qualifiedViewName = generate_qualified_relation_name(viewId); StringInfo dropCommand = makeStringInfo(); appendStringInfo(dropCommand, "DROP %sVIEW IF EXISTS %s", diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9f3975a1e..5ec6d6dd7 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1323,10 +1323,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, { List *partitionList = PartitionList(relationId); Oid partitionRelationId = InvalidOid; - Oid namespaceId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(namespaceId); - char *relationName = get_rel_name(relationId); - char *parentRelationName = quote_qualified_identifier(schemaName, relationName); + char *parentRelationName = generate_qualified_relation_name(relationId); /* * when there are many partitions, each call to CreateDistributedTable diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c69e33f94..0284ea64d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2547,12 +2547,8 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu if (columnNulls[partitionColumnIndex]) { - Oid relationId = copyDest->distributedRelationId; - char *relationName = get_rel_name(relationId); - Oid schemaOid = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(schemaOid); - char *qualifiedTableName = quote_qualified_identifier(schemaName, - relationName); + char *qualifiedTableName = generate_qualified_relation_name( + copyDest->distributedRelationId); ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("the partition column of table %s cannot be NULL", diff --git a/src/backend/distributed/commands/view.c b/src/backend/distributed/commands/view.c index 0c39be4ca..9689b9267 100644 --- a/src/backend/distributed/commands/view.c +++ b/src/backend/distributed/commands/view.c @@ -392,9 +392,7 @@ CreateViewDDLCommand(Oid viewOid) static void AppendQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid) { - char *viewName = get_rel_name(viewOid); - char *schemaName = get_namespace_name(get_rel_namespace(viewOid)); - char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName); + char *qualifiedViewName = generate_qualified_relation_name(viewOid); appendStringInfo(buf, "%s ", qualifiedViewName); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f5fbb3f78..a8dc1fa5a 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -143,15 +143,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node) targetRelation->partitionColumn); if (distributionColumnIndex == -1) { - char *relationName = get_rel_name(targetRelationId); - Oid schemaOid = get_rel_namespace(targetRelationId); - char *schemaName = get_namespace_name(schemaOid); - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg( "the partition column of table %s should have a value", - quote_qualified_identifier(schemaName, - relationName)))); + generate_qualified_relation_name(targetRelationId)))); } TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList, diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 7d6747caf..805ef39d7 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1945,11 +1945,7 @@ ConstructQualifiedShardName(ShardInterval *shardInterval) static List * RecreateTableDDLCommandList(Oid relationId) { - const char *relationName = get_rel_name(relationId); - Oid relationSchemaId = get_rel_namespace(relationId); - const char *relationSchemaName = get_namespace_name(relationSchemaId); - const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName, - relationName); + const char *qualifiedRelationName = generate_qualified_relation_name(relationId); StringInfo dropCommand = makeStringInfo(); diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 6d7b5326a..280de4493 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -170,14 +170,10 @@ WorkerDropDistributedTable(Oid relationId) */ if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL)) { - char *relName = get_rel_name(relationId); - Oid schemaId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(schemaId); - StringInfo dropCommand = makeStringInfo(); appendStringInfo(dropCommand, "DROP%sTABLE %s CASCADE", IsForeignTable(relationId) ? " FOREIGN " : " ", - quote_qualified_identifier(schemaName, relName)); + generate_qualified_relation_name(relationId)); Node *dropCommandNode = ParseTreeNode(dropCommand->data); From 72fbea20c49c6de74f0c5005f77efce9f3a54178 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 23 Jan 2024 11:55:03 +0100 Subject: [PATCH 3/8] Replace spurious strdup with pstrdup (#7440) Not sure why we never found this using valgrind, but using strdup will cause memory leaks because the pointer is not tracked in a memory context. --- src/backend/distributed/planner/query_colocation_checker.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index 827a0286c..d9b1aad5a 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -433,7 +433,7 @@ CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index rteIndex, attributeTuple->atttypmod, attributeTuple->attcollation, 0); TargetEntry *targetEntry = makeTargetEntry((Expr *) targetColumn, resno, - strdup(attributeTuple->attname.data), false); + pstrdup(attributeTuple->attname.data), false); return targetEntry; } From 9683bef2ecf624b54a42bbd94ed4f4c4e765aa7b Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Tue, 23 Jan 2024 13:28:26 +0100 Subject: [PATCH 4/8] Replace more spurious strdups with pstrdups (#7441) DESCRIPTION: Remove a few small memory leaks In #7440 one instance of a strdup was removed. But there were a few more. This removes the ones that are left over, or adds a comment why strdup is on purpose. --- src/backend/distributed/commands/extension.c | 8 ++++---- .../distributed/connection/connection_configuration.c | 4 ++++ .../distributed/planner/query_colocation_checker.c | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 36267ff66..2ead0c58a 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -776,7 +776,7 @@ PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree) /*create extension citus version xxx*/ if (newVersionValue) { - char *newVersion = strdup(defGetString(newVersionValue)); + char *newVersion = pstrdup(defGetString(newVersionValue)); versionNumber = GetExtensionVersionNumber(newVersion); } @@ -796,7 +796,7 @@ PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree) Oid citusOid = get_extension_oid("citus", true); if (citusOid != InvalidOid) { - char *curCitusVersion = strdup(get_extension_version(citusOid)); + char *curCitusVersion = pstrdup(get_extension_version(citusOid)); int curCitusVersionNum = GetExtensionVersionNumber(curCitusVersion); if (curCitusVersionNum < 1110) { @@ -891,7 +891,7 @@ PreprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree) if (newVersionValue) { char *newVersion = defGetString(newVersionValue); - double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion)); + double newVersionNumber = GetExtensionVersionNumber(pstrdup(newVersion)); /*alter extension citus update to version >= 11.1-1, and no citus_columnar installed */ if (newVersionNumber >= 1110 && citusColumnarOid == InvalidOid) @@ -935,7 +935,7 @@ PostprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree) if (newVersionValue) { char *newVersion = defGetString(newVersionValue); - double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion)); + double newVersionNumber = GetExtensionVersionNumber(pstrdup(newVersion)); if (newVersionNumber >= 1110 && citusColumnarOid != InvalidOid) { /*upgrade citus, after "ALTER EXTENSION citus update to xxx" updates citus_columnar Y to version Z. */ diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 57069f698..c52254f9c 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -123,6 +123,10 @@ AddConnParam(const char *keyword, const char *value) errmsg("ConnParams arrays bound check failed"))); } + /* + * Don't use pstrdup here to avoid being tied to a memory context, we free + * these later using ResetConnParams + */ ConnParams.keywords[ConnParams.size] = strdup(keyword); ConnParams.values[ConnParams.size] = strdup(value); ConnParams.size++; diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index d9b1aad5a..bef91618e 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -449,7 +449,7 @@ CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int resno) attributeTuple->attcollation); char *resName = attributeTuple->attname.data; TargetEntry *targetEntry = - makeTargetEntry(nullExpr, resno, strdup(resName), false); + makeTargetEntry(nullExpr, resno, pstrdup(resName), false); return targetEntry; } From 11d7c273523c5893f195455c839551f9999bc9c9 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Tue, 23 Jan 2024 10:24:45 -0800 Subject: [PATCH 5/8] Fix assertions in other PG versions too, the original fix is in PR-7379 --- src/backend/distributed/deparser/ruleutils_14.c | 11 +++++++++-- src/backend/distributed/deparser/ruleutils_16.c | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c index 01b74eab1..88948cff5 100644 --- a/src/backend/distributed/deparser/ruleutils_14.c +++ b/src/backend/distributed/deparser/ruleutils_14.c @@ -1526,8 +1526,15 @@ set_join_column_names(deparse_namespace *dpns, RangeTblEntry *rte, /* Assert we processed the right number of columns */ #ifdef USE_ASSERT_CHECKING - while (i < colinfo->num_cols && colinfo->colnames[i] == NULL) - i++; + for (int col_index = 0; col_index < colinfo->num_cols; col_index++) + { + /* + * In the above processing-loops, "i" advances only if + * the column is not new, check if this is a new column. + */ + if (colinfo->is_new_col[col_index]) + i++; + } Assert(i == colinfo->num_cols); Assert(j == nnewcolumns); #endif diff --git a/src/backend/distributed/deparser/ruleutils_16.c b/src/backend/distributed/deparser/ruleutils_16.c index 10373e487..7f2a41d75 100644 --- a/src/backend/distributed/deparser/ruleutils_16.c +++ b/src/backend/distributed/deparser/ruleutils_16.c @@ -1580,8 +1580,15 @@ set_join_column_names(deparse_namespace *dpns, RangeTblEntry *rte, /* Assert we processed the right number of columns */ #ifdef USE_ASSERT_CHECKING - while (i < colinfo->num_cols && colinfo->colnames[i] == NULL) - i++; + for (int col_index = 0; col_index < colinfo->num_cols; col_index++) + { + /* + * In the above processing-loops, "i" advances only if + * the column is not new, check if this is a new column. + */ + if (colinfo->is_new_col[col_index]) + i++; + } Assert(i == colinfo->num_cols); Assert(j == nnewcolumns); #endif From 863713e9b7aed4762ed9851fea0dbf68bcdab323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCrkan=20=C4=B0ndibay?= Date: Wed, 24 Jan 2024 09:00:19 +0300 Subject: [PATCH 6/8] Refactors ExtendedTaskList methods (#7372) ExecuteTaskListIntoTupleDestWithParam and ExecuteTaskListIntoTupleDest are nearly the same. I parameterized and a made a reusable structure here --------- Co-authored-by: Onur Tirtir --- .../distributed/executor/adaptive_executor.c | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1b0277f2e..e912f418d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -727,6 +727,11 @@ static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time end static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); static void SetAttributeInputMetadata(DistributedExecution *execution, ShardCommandExecution *shardCommandExecution); +static ExecutionParams * CreateDefaultExecutionParams(RowModifyLevel modLevel, + List *taskList, + TupleDestination *tupleDest, + bool expectResults, + ParamListInfo paramListInfo); /* @@ -1013,14 +1018,14 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, /* - * ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses - * bind params from executor state, and with defaults for some of the arguments. + * CreateDefaultExecutionParams returns execution params based on given (possibly null) + * bind params (presumably from executor state) with defaults for some of the arguments. */ -uint64 -ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList, - TupleDestination *tupleDest, - bool expectResults, - ParamListInfo paramListInfo) +static ExecutionParams * +CreateDefaultExecutionParams(RowModifyLevel modLevel, List *taskList, + TupleDestination *tupleDest, + bool expectResults, + ParamListInfo paramListInfo) { int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool localExecutionSupported = true; @@ -1034,6 +1039,24 @@ ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList, executionParams->tupleDestination = tupleDest; executionParams->paramListInfo = paramListInfo; + return executionParams; +} + + +/* + * ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses + * bind params from executor state, and with defaults for some of the arguments. + */ +uint64 +ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList, + TupleDestination *tupleDest, + bool expectResults, + ParamListInfo paramListInfo) +{ + ExecutionParams *executionParams = CreateDefaultExecutionParams(modLevel, taskList, + tupleDest, + expectResults, + paramListInfo); return ExecuteTaskListExtended(executionParams); } @@ -1047,17 +1070,11 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList, TupleDestination *tupleDest, bool expectResults) { - int targetPoolSize = MaxAdaptiveExecutorPoolSize; - bool localExecutionSupported = true; - ExecutionParams *executionParams = CreateBasicExecutionParams( - modLevel, taskList, targetPoolSize, localExecutionSupported - ); - - executionParams->xactProperties = DecideTransactionPropertiesForTaskList( - modLevel, taskList, false); - executionParams->expectResults = expectResults; - executionParams->tupleDestination = tupleDest; - + ParamListInfo paramListInfo = NULL; + ExecutionParams *executionParams = CreateDefaultExecutionParams(modLevel, taskList, + tupleDest, + expectResults, + paramListInfo); return ExecuteTaskListExtended(executionParams); } From 3ffb831beb420d7d0794600d3e4b3106897e0ae1 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 24 Jan 2024 11:50:49 +0300 Subject: [PATCH 7/8] Update contributing docs (#7447) This is a minor change to use a generic name instead of our legacy CI provider name in the contributing documentation. --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index eaec55c3e..e1900642d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -175,7 +175,7 @@ that are missing in earlier minor versions. ### Following our coding conventions -CircleCI will automatically reject any PRs which do not follow our coding +CI pipeline will automatically reject any PRs which do not follow our coding conventions. The easiest way to ensure your PR adheres to those conventions is to use the [citus_indent](https://github.com/citusdata/tools/tree/develop/uncrustify) tool. This tool uses `uncrustify` under the hood. From 1cb2e1e4e8d31601f5b250f90b02d71a8a02e82e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20Ozan=20Akg=C3=BCl?= Date: Wed, 24 Jan 2024 12:57:54 +0300 Subject: [PATCH 8/8] Fixes create user queries from Citus non-main databases with other users (#7442) This PR makes the connections to other nodes for `mark_object_distributed` use the same user as `execute_command_on_remote_nodes_as_user` so they'll use the same connection. --- src/backend/distributed/commands/utility_hook.c | 5 +++-- src/backend/distributed/metadata/distobject.c | 16 +++++++++++----- .../sql/downgrades/citus--12.2-1--12.1-1.sql | 2 +- .../sql/udfs/mark_object_distributed/12.2-1.sql | 4 ++-- .../sql/udfs/mark_object_distributed/latest.sql | 4 ++-- .../distributed/transaction/worker_transaction.c | 6 +----- src/include/distributed/metadata/distobject.h | 3 ++- src/include/distributed/worker_transaction.h | 4 ++++ .../citus_tests/test/test_other_databases.py | 4 ++-- src/test/regress/expected/multi_extension.out | 2 +- src/test/regress/expected/other_databases.out | 1 + .../expected/upgrade_list_citus_objects.out | 2 +- src/test/regress/sql/other_databases.sql | 1 + 13 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index c2155383a..68af4b7b5 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -92,7 +92,7 @@ #define START_MANAGEMENT_TRANSACTION \ "SELECT citus_internal.start_management_transaction('%lu')" #define MARK_OBJECT_DISTRIBUTED \ - "SELECT citus_internal.mark_object_distributed(%d, %s, %d)" + "SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -1636,7 +1636,8 @@ RunPostprocessMainDBCommand(Node *parsetree) MARK_OBJECT_DISTRIBUTED, AuthIdRelationId, quote_literal_cstr(createRoleStmt->role), - roleOid); + roleOid, + quote_literal_cstr(CurrentUserName())); RunCitusMainDBQuery(mainDBQuery->data); } } diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 1d07be8c3..007d07bdc 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -67,7 +67,8 @@ PG_FUNCTION_INFO_V1(master_unmark_object_distributed); /* * mark_object_distributed adds an object to pg_dist_object - * in all of the nodes. + * in all of the nodes, for the connections to the other nodes this function + * uses the user passed. */ Datum mark_object_distributed(PG_FUNCTION_ARGS) @@ -81,6 +82,8 @@ mark_object_distributed(PG_FUNCTION_ARGS) Oid objectId = PG_GETARG_OID(2); ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*objectAddress, classId, objectId); + text *connectionUserText = PG_GETARG_TEXT_P(3); + char *connectionUser = text_to_cstring(connectionUserText); /* * This function is called when a query is run from a Citus non-main database. @@ -88,7 +91,8 @@ mark_object_distributed(PG_FUNCTION_ARGS) * 2PC still works. */ bool useConnectionForLocalQuery = true; - MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery); + MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery, + connectionUser); PG_RETURN_VOID(); } @@ -193,7 +197,8 @@ void MarkObjectDistributed(const ObjectAddress *distAddress) { bool useConnectionForLocalQuery = false; - MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery); + MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery, + CurrentUserName()); } @@ -204,7 +209,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress) */ void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName, - bool useConnectionForLocalQuery) + bool useConnectionForLocalQuery, char *connectionUser) { if (!CitusHasBeenLoaded()) { @@ -234,7 +239,8 @@ MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName { char *workerPgDistObjectUpdateCommand = CreatePgDistObjectEntryCommand(distAddress, objectName); - SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); + SendCommandToRemoteMetadataNodesParams(workerPgDistObjectUpdateCommand, + connectionUser, 0, NULL, NULL); } } diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index f889a0095..20d85444f 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -15,7 +15,7 @@ DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user( ); DROP FUNCTION citus_internal.mark_object_distributed( - classId Oid, objectName text, objectId Oid + classId Oid, objectName text, objectId Oid, connectionUser text ); DROP FUNCTION citus_internal.commit_management_command_2pc(); diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql index ee2c5e7e8..25d35c028 100644 --- a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql @@ -1,7 +1,7 @@ -CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) RETURNS VOID LANGUAGE C AS 'MODULE_PATHNAME', $$mark_object_distributed$$; -COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql index ee2c5e7e8..25d35c028 100644 --- a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql @@ -1,7 +1,7 @@ -CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) RETURNS VOID LANGUAGE C AS 'MODULE_PATHNAME', $$mark_object_distributed$$; -COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text) IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 9c8563de0..c6fcee107 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -36,10 +36,6 @@ #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" -static void SendCommandToRemoteMetadataNodesParams(const char *command, - const char *user, int parameterCount, - const Oid *parameterTypes, - const char *const *parameterValues); static void SendBareCommandListToMetadataNodesInternal(List *commandList, TargetWorkerSet targetWorkerSet); static void SendCommandToMetadataWorkersParams(const char *command, @@ -209,7 +205,7 @@ SendCommandListToRemoteNodesWithMetadata(List *commands) * SendCommandToWorkersParamsInternal() that can be used to send commands * to remote metadata nodes. */ -static void +void SendCommandToRemoteMetadataNodesParams(const char *command, const char *user, int parameterCount, const Oid *parameterTypes, diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 13f38178b..e98e6ee86 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -24,7 +24,8 @@ extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name, - bool useConnectionForLocalQuery); + bool useConnectionForLocalQuery, + char *connectionUser); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index b9a855828..1b3809a0e 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -68,6 +68,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser, const char *command); extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser, const char *command); +extern void SendCommandToRemoteMetadataNodesParams(const char *command, + const char *user, int parameterCount, + const Oid *parameterTypes, + const char *const *parameterValues); extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, diff --git a/src/test/regress/citus_tests/test/test_other_databases.py b/src/test/regress/citus_tests/test/test_other_databases.py index 925b065a7..494301692 100644 --- a/src/test/regress/citus_tests/test/test_other_databases.py +++ b/src/test/regress/citus_tests/test/test_other_databases.py @@ -22,7 +22,7 @@ def test_main_commited_outer_not_yet(cluster): "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" ) cur2.execute( - "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" + "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123, 'postgres')" ) cur2.execute("COMMIT") @@ -133,7 +133,7 @@ def test_main_commited_outer_aborted(cluster): "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')" ) cur2.execute( - "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)" + "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321, 'postgres')" ) cur2.execute("COMMIT") diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 60e283800..57dbe3e75 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1424,7 +1424,7 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- | function citus_internal.commit_management_command_2pc() void | function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void - | function citus_internal.mark_object_distributed(oid,text,oid) void + | function citus_internal.mark_object_distributed(oid,text,oid,text) void | function citus_internal.start_management_transaction(xid8) void | function citus_internal_acquire_citus_advisory_object_class_lock(integer,cstring) void | function citus_internal_database_command(text) void diff --git a/src/test/regress/expected/other_databases.out b/src/test/regress/expected/other_databases.out index 1b81af3b7..9e170861e 100644 --- a/src/test/regress/expected/other_databases.out +++ b/src/test/regress/expected/other_databases.out @@ -71,6 +71,7 @@ SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerou ERROR: operation is not allowed HINT: Run the command with a superuser. \c other_db1 +SET ROLE nonsuperuser; CREATE USER other_db_user9; RESET ROLE; \c regression diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 97e5c0928..9bd542f05 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -59,7 +59,7 @@ ORDER BY 1; function citus_internal.commit_management_command_2pc() function citus_internal.execute_command_on_remote_nodes_as_user(text,text) function citus_internal.find_groupid_for_node(text,integer) - function citus_internal.mark_object_distributed(oid,text,oid) + function citus_internal.mark_object_distributed(oid,text,oid,text) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func() diff --git a/src/test/regress/sql/other_databases.sql b/src/test/regress/sql/other_databases.sql index 629f74f45..563793518 100644 --- a/src/test/regress/sql/other_databases.sql +++ b/src/test/regress/sql/other_databases.sql @@ -51,6 +51,7 @@ SET ROLE nonsuperuser; SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); \c other_db1 +SET ROLE nonsuperuser; CREATE USER other_db_user9; RESET ROLE;