Merge branch 'main' into main

pull/7432/head
Jelte Fennema-Nio 2024-01-24 12:09:34 +01:00 committed by GitHub
commit aaadd23e17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 143 additions and 162 deletions

View File

@ -175,7 +175,7 @@ that are missing in earlier minor versions.
### Following our coding conventions
CircleCI will automatically reject any PRs which do not follow our coding
CI pipeline will automatically reject any PRs which do not follow our coding
conventions. The easiest way to ensure your PR adheres to those conventions is
to use the [citus_indent](https://github.com/citusdata/tools/tree/develop/uncrustify)
tool. This tool uses `uncrustify` under the hood.

View File

@ -209,12 +209,9 @@ static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommand
static bool HasAnyGeneratedStoredColumns(Oid relationId);
static List * GetNonGeneratedStoredColumnNameList(Oid relationId);
static void CheckAlterDistributedTableConversionParameters(TableConversionState *con);
static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName,
char *sequenceName,
char *sourceSchemaName,
char *sourceName,
char *targetSchemaName,
char *targetName);
static char * CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName,
char *qualifiedSourceName,
char *qualifiedTargetName);
static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid);
static char * CreateMaterializedViewDDLCommand(Oid matViewOid);
static char * GetAccessMethodForMatViewIfExists(Oid viewOid);
@ -791,13 +788,15 @@ ConvertTableInternal(TableConversionState *con)
justBeforeDropCommands = lappend(justBeforeDropCommands, detachFromParentCommand);
}
char *qualifiedRelationName = quote_qualified_identifier(con->schemaName,
con->relationName);
if (PartitionedTable(con->relationId))
{
if (!con->suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("converting the partitions of %s",
quote_qualified_identifier(con->schemaName,
con->relationName))));
qualifiedRelationName)));
}
List *partitionList = PartitionList(con->relationId);
@ -870,9 +869,7 @@ ConvertTableInternal(TableConversionState *con)
if (!con->suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("creating a new table for %s",
quote_qualified_identifier(con->schemaName,
con->relationName))));
ereport(NOTICE, (errmsg("creating a new table for %s", qualifiedRelationName)));
}
TableDDLCommand *tableCreationCommand = NULL;
@ -999,8 +996,6 @@ ConvertTableInternal(TableConversionState *con)
{
continue;
}
char *qualifiedRelationName = quote_qualified_identifier(con->schemaName,
con->relationName);
TableConversionParameters cascadeParam = {
.relationId = colocatedTableId,
@ -1750,9 +1745,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
{
StringInfo query = makeStringInfo();
char *viewName = get_rel_name(matViewOid);
char *schemaName = get_namespace_name(get_rel_namespace(matViewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
char *qualifiedViewName = generate_qualified_relation_name(matViewOid);
/* here we need to get the access method of the view to recreate it */
char *accessMethodName = GetAccessMethodForMatViewIfExists(matViewOid);
@ -1801,9 +1794,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
bool suppressNoticeMessages)
{
char *sourceName = get_rel_name(sourceId);
char *targetName = get_rel_name(targetId);
Oid schemaId = get_rel_namespace(sourceId);
char *schemaName = get_namespace_name(schemaId);
char *qualifiedSourceName = generate_qualified_relation_name(sourceId);
char *qualifiedTargetName = generate_qualified_relation_name(targetId);
StringInfo query = makeStringInfo();
@ -1811,8 +1803,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
{
if (!suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("moving the data of %s",
quote_qualified_identifier(schemaName, sourceName))));
ereport(NOTICE, (errmsg("moving the data of %s", qualifiedSourceName)));
}
if (!HasAnyGeneratedStoredColumns(sourceId))
@ -1822,8 +1813,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
* "INSERT INTO .. SELECT *"".
*/
appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s",
quote_qualified_identifier(schemaName, targetName),
quote_qualified_identifier(schemaName, sourceName));
qualifiedTargetName, qualifiedSourceName);
}
else
{
@ -1838,9 +1828,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
char *insertColumnString = StringJoin(nonStoredColumnNameList, ',');
appendStringInfo(query,
"INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s",
quote_qualified_identifier(schemaName, targetName),
insertColumnString, insertColumnString,
quote_qualified_identifier(schemaName, sourceName));
qualifiedTargetName, insertColumnString,
insertColumnString, qualifiedSourceName);
}
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
@ -1864,14 +1853,11 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
*/
if (ShouldSyncTableMetadata(targetId))
{
Oid sequenceSchemaOid = get_rel_namespace(sequenceOid);
char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid);
char *sequenceName = get_rel_name(sequenceOid);
char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid);
char *workerChangeSequenceDependencyCommand =
CreateWorkerChangeSequenceDependencyCommand(sequenceSchemaName,
sequenceName,
schemaName, sourceName,
schemaName, targetName);
CreateWorkerChangeSequenceDependencyCommand(qualifiedSequenceName,
qualifiedSourceName,
qualifiedTargetName);
SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand);
}
else if (ShouldSyncTableMetadata(sourceId))
@ -1894,25 +1880,23 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
if (!suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("dropping the old %s",
quote_qualified_identifier(schemaName, sourceName))));
ereport(NOTICE, (errmsg("dropping the old %s", qualifiedSourceName)));
}
resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
quote_qualified_identifier(schemaName, sourceName));
qualifiedSourceName);
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
if (!suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("renaming the new table to %s",
quote_qualified_identifier(schemaName, sourceName))));
ereport(NOTICE, (errmsg("renaming the new table to %s", qualifiedSourceName)));
}
resetStringInfo(query);
appendStringInfo(query, "ALTER TABLE %s RENAME TO %s",
quote_qualified_identifier(schemaName, targetName),
qualifiedTargetName,
quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
}
@ -2172,18 +2156,13 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con)
* worker_change_sequence_dependency query with the parameters.
*/
static char *
CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName,
char *sourceSchemaName, char *sourceName,
char *targetSchemaName, char *targetName)
CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName,
char *qualifiedSourceName,
char *qualifiedTargetName)
{
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
sequenceName);
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
quote_literal_cstr(qualifiedSchemaName),
quote_literal_cstr(qualifiedSequeceName),
quote_literal_cstr(qualifiedSourceName),
quote_literal_cstr(qualifiedTargetName));

View File

@ -1160,9 +1160,7 @@ DropIdentitiesOnTable(Oid relationId)
if (attributeForm->attidentity)
{
char *tableName = get_rel_name(relationId);
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
char *qualifiedTableName = generate_qualified_relation_name(relationId);
StringInfo dropCommand = makeStringInfo();
@ -1222,9 +1220,7 @@ DropViewsOnTable(Oid relationId)
Oid viewId = InvalidOid;
foreach_oid(viewId, reverseOrderedViews)
{
char *viewName = get_rel_name(viewId);
char *schemaName = get_namespace_name(get_rel_namespace(viewId));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
char *qualifiedViewName = generate_qualified_relation_name(viewId);
StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "DROP %sVIEW IF EXISTS %s",

View File

@ -1323,10 +1323,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
List *partitionList = PartitionList(relationId);
Oid partitionRelationId = InvalidOid;
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
char *parentRelationName = generate_qualified_relation_name(relationId);
/*
* when there are many partitions, each call to CreateDistributedTable

View File

@ -776,7 +776,7 @@ PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree)
/*create extension citus version xxx*/
if (newVersionValue)
{
char *newVersion = strdup(defGetString(newVersionValue));
char *newVersion = pstrdup(defGetString(newVersionValue));
versionNumber = GetExtensionVersionNumber(newVersion);
}
@ -796,7 +796,7 @@ PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree)
Oid citusOid = get_extension_oid("citus", true);
if (citusOid != InvalidOid)
{
char *curCitusVersion = strdup(get_extension_version(citusOid));
char *curCitusVersion = pstrdup(get_extension_version(citusOid));
int curCitusVersionNum = GetExtensionVersionNumber(curCitusVersion);
if (curCitusVersionNum < 1110)
{
@ -891,7 +891,7 @@ PreprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree)
if (newVersionValue)
{
char *newVersion = defGetString(newVersionValue);
double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion));
double newVersionNumber = GetExtensionVersionNumber(pstrdup(newVersion));
/*alter extension citus update to version >= 11.1-1, and no citus_columnar installed */
if (newVersionNumber >= 1110 && citusColumnarOid == InvalidOid)
@ -935,7 +935,7 @@ PostprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree)
if (newVersionValue)
{
char *newVersion = defGetString(newVersionValue);
double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion));
double newVersionNumber = GetExtensionVersionNumber(pstrdup(newVersion));
if (newVersionNumber >= 1110 && citusColumnarOid != InvalidOid)
{
/*upgrade citus, after "ALTER EXTENSION citus update to xxx" updates citus_columnar Y to version Z. */

View File

@ -2547,12 +2547,8 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
if (columnNulls[partitionColumnIndex])
{
Oid relationId = copyDest->distributedRelationId;
char *relationName = get_rel_name(relationId);
Oid schemaOid = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaOid);
char *qualifiedTableName = quote_qualified_identifier(schemaName,
relationName);
char *qualifiedTableName = generate_qualified_relation_name(
copyDest->distributedRelationId);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column of table %s cannot be NULL",

View File

@ -92,7 +92,7 @@
#define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d)"
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
@ -1636,7 +1636,8 @@ RunPostprocessMainDBCommand(Node *parsetree)
MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role),
roleOid);
roleOid,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
}

View File

@ -392,9 +392,7 @@ CreateViewDDLCommand(Oid viewOid)
static void
AppendQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid)
{
char *viewName = get_rel_name(viewOid);
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
char *qualifiedViewName = generate_qualified_relation_name(viewOid);
appendStringInfo(buf, "%s ", qualifiedViewName);
}

View File

@ -123,6 +123,10 @@ AddConnParam(const char *keyword, const char *value)
errmsg("ConnParams arrays bound check failed")));
}
/*
* Don't use pstrdup here to avoid being tied to a memory context, we free
* these later using ResetConnParams
*/
ConnParams.keywords[ConnParams.size] = strdup(keyword);
ConnParams.values[ConnParams.size] = strdup(value);
ConnParams.size++;

View File

@ -1526,8 +1526,15 @@ set_join_column_names(deparse_namespace *dpns, RangeTblEntry *rte,
/* Assert we processed the right number of columns */
#ifdef USE_ASSERT_CHECKING
while (i < colinfo->num_cols && colinfo->colnames[i] == NULL)
i++;
for (int col_index = 0; col_index < colinfo->num_cols; col_index++)
{
/*
* In the above processing-loops, "i" advances only if
* the column is not new, check if this is a new column.
*/
if (colinfo->is_new_col[col_index])
i++;
}
Assert(i == colinfo->num_cols);
Assert(j == nnewcolumns);
#endif

View File

@ -1580,8 +1580,15 @@ set_join_column_names(deparse_namespace *dpns, RangeTblEntry *rte,
/* Assert we processed the right number of columns */
#ifdef USE_ASSERT_CHECKING
while (i < colinfo->num_cols && colinfo->colnames[i] == NULL)
i++;
for (int col_index = 0; col_index < colinfo->num_cols; col_index++)
{
/*
* In the above processing-loops, "i" advances only if
* the column is not new, check if this is a new column.
*/
if (colinfo->is_new_col[col_index])
i++;
}
Assert(i == colinfo->num_cols);
Assert(j == nnewcolumns);
#endif

View File

@ -727,6 +727,11 @@ static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time end
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
static void SetAttributeInputMetadata(DistributedExecution *execution,
ShardCommandExecution *shardCommandExecution);
static ExecutionParams * CreateDefaultExecutionParams(RowModifyLevel modLevel,
List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo);
/*
@ -1013,14 +1018,14 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
/*
* ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses
* bind params from executor state, and with defaults for some of the arguments.
* CreateDefaultExecutionParams returns execution params based on given (possibly null)
* bind params (presumably from executor state) with defaults for some of the arguments.
*/
uint64
ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo)
static ExecutionParams *
CreateDefaultExecutionParams(RowModifyLevel modLevel, List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo)
{
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
bool localExecutionSupported = true;
@ -1034,6 +1039,24 @@ ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList,
executionParams->tupleDestination = tupleDest;
executionParams->paramListInfo = paramListInfo;
return executionParams;
}
/*
* ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses
* bind params from executor state, and with defaults for some of the arguments.
*/
uint64
ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo)
{
ExecutionParams *executionParams = CreateDefaultExecutionParams(modLevel, taskList,
tupleDest,
expectResults,
paramListInfo);
return ExecuteTaskListExtended(executionParams);
}
@ -1047,17 +1070,11 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
TupleDestination *tupleDest,
bool expectResults)
{
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
bool localExecutionSupported = true;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false);
executionParams->expectResults = expectResults;
executionParams->tupleDestination = tupleDest;
ParamListInfo paramListInfo = NULL;
ExecutionParams *executionParams = CreateDefaultExecutionParams(modLevel, taskList,
tupleDest,
expectResults,
paramListInfo);
return ExecuteTaskListExtended(executionParams);
}

View File

@ -143,15 +143,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
targetRelation->partitionColumn);
if (distributionColumnIndex == -1)
{
char *relationName = get_rel_name(targetRelationId);
Oid schemaOid = get_rel_namespace(targetRelationId);
char *schemaName = get_namespace_name(schemaOid);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg(
"the partition column of table %s should have a value",
quote_qualified_identifier(schemaName,
relationName))));
generate_qualified_relation_name(targetRelationId))));
}
TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList,

View File

@ -67,7 +67,8 @@ PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
/*
* mark_object_distributed adds an object to pg_dist_object
* in all of the nodes.
* in all of the nodes, for the connections to the other nodes this function
* uses the user passed.
*/
Datum
mark_object_distributed(PG_FUNCTION_ARGS)
@ -81,6 +82,8 @@ mark_object_distributed(PG_FUNCTION_ARGS)
Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId);
text *connectionUserText = PG_GETARG_TEXT_P(3);
char *connectionUser = text_to_cstring(connectionUserText);
/*
* This function is called when a query is run from a Citus non-main database.
@ -88,7 +91,8 @@ mark_object_distributed(PG_FUNCTION_ARGS)
* 2PC still works.
*/
bool useConnectionForLocalQuery = true;
MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery);
MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery,
connectionUser);
PG_RETURN_VOID();
}
@ -193,7 +197,8 @@ void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
bool useConnectionForLocalQuery = false;
MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery);
MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery,
CurrentUserName());
}
@ -204,7 +209,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
*/
void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName,
bool useConnectionForLocalQuery)
bool useConnectionForLocalQuery, char *connectionUser)
{
if (!CitusHasBeenLoaded())
{
@ -234,7 +239,8 @@ MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress, objectName);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
SendCommandToRemoteMetadataNodesParams(workerPgDistObjectUpdateCommand,
connectionUser, 0, NULL, NULL);
}
}

View File

@ -1945,11 +1945,7 @@ ConstructQualifiedShardName(ShardInterval *shardInterval)
static List *
RecreateTableDDLCommandList(Oid relationId)
{
const char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
const char *relationSchemaName = get_namespace_name(relationSchemaId);
const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName,
relationName);
const char *qualifiedRelationName = generate_qualified_relation_name(relationId);
StringInfo dropCommand = makeStringInfo();

View File

@ -433,7 +433,7 @@ CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index rteIndex,
attributeTuple->atttypmod, attributeTuple->attcollation, 0);
TargetEntry *targetEntry =
makeTargetEntry((Expr *) targetColumn, resno,
strdup(attributeTuple->attname.data), false);
pstrdup(attributeTuple->attname.data), false);
return targetEntry;
}
@ -449,7 +449,7 @@ CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int resno)
attributeTuple->attcollation);
char *resName = attributeTuple->attname.data;
TargetEntry *targetEntry =
makeTargetEntry(nullExpr, resno, strdup(resName), false);
makeTargetEntry(nullExpr, resno, pstrdup(resName), false);
return targetEntry;
}

View File

@ -15,7 +15,7 @@ DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(
);
DROP FUNCTION citus_internal.mark_object_distributed(
classId Oid, objectName text, objectId Oid
classId Oid, objectName text, objectId Oid, connectionUser text
);
DROP FUNCTION citus_internal.commit_management_command_2pc();

View File

@ -1,7 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
IS 'adds an object to pg_dist_object on all nodes';

View File

@ -1,7 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
IS 'adds an object to pg_dist_object on all nodes';

View File

@ -36,10 +36,6 @@
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
static void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
static void SendBareCommandListToMetadataNodesInternal(List *commandList,
TargetWorkerSet targetWorkerSet);
static void SendCommandToMetadataWorkersParams(const char *command,
@ -209,7 +205,7 @@ SendCommandListToRemoteNodesWithMetadata(List *commands)
* SendCommandToWorkersParamsInternal() that can be used to send commands
* to remote metadata nodes.
*/
static void
void
SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,

View File

@ -170,14 +170,10 @@ WorkerDropDistributedTable(Oid relationId)
*/
if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL))
{
char *relName = get_rel_name(relationId);
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "DROP%sTABLE %s CASCADE",
IsForeignTable(relationId) ? " FOREIGN " : " ",
quote_qualified_identifier(schemaName, relName));
generate_qualified_relation_name(relationId));
Node *dropCommandNode = ParseTreeNode(dropCommand->data);

View File

@ -92,38 +92,21 @@ CitusNodeTagI(Node *node)
return ((CitusNode*)(node))->citus_tag;
}
/*
* Postgres's nodes/nodes.h has more information on why we do this.
*/
#ifdef __GNUC__
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ CitusNode *_result; \
AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \
_result = (CitusNode *) palloc0fast(size); \
_result->extensible.type = T_ExtensibleNode; \
_result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \
_result->citus_tag =(int) (tag); \
_result; \
})
static inline CitusNode *
CitusNewNode(size_t size, CitusNodeTag tag)
{
CitusNode *result;
#else
extern CitusNode *newCitusNodeMacroHolder;
#define CitusNewNode(size, tag) \
( \
AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \
newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \
newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \
newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \
newCitusNodeMacroHolder->citus_tag =(int) (tag), \
newCitusNodeMacroHolder \
)
#endif
Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */
result = (CitusNode *) palloc0(size);
result->extensible.type = T_ExtensibleNode;
result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START];
result->citus_tag = (int) (tag);
return result;
}
/*
* IsA equivalent that compares node tags, including Citus-specific nodes.

View File

@ -24,7 +24,8 @@ extern bool IsAnyObjectDistributed(const List *addresses);
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name,
bool useConnectionForLocalQuery);
bool useConnectionForLocalQuery,
char *connectionUser);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);

View File

@ -68,6 +68,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,
const char *nodeUser, const char *command);
extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort,
const char *nodeUser, const char *command);
extern void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,

View File

@ -22,7 +22,7 @@ def test_main_commited_outer_not_yet(cluster):
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)"
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123, 'postgres')"
)
cur2.execute("COMMIT")
@ -133,7 +133,7 @@ def test_main_commited_outer_aborted(cluster):
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)"
"SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321, 'postgres')"
)
cur2.execute("COMMIT")

View File

@ -1424,7 +1424,7 @@ SELECT * FROM multi_extension.print_extension_changes();
---------------------------------------------------------------------
| function citus_internal.commit_management_command_2pc() void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.mark_object_distributed(oid,text,oid) void
| function citus_internal.mark_object_distributed(oid,text,oid,text) void
| function citus_internal.start_management_transaction(xid8) void
| function citus_internal_acquire_citus_advisory_object_class_lock(integer,cstring) void
| function citus_internal_database_command(text) void

View File

@ -71,6 +71,7 @@ SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerou
ERROR: operation is not allowed
HINT: Run the command with a superuser.
\c other_db1
SET ROLE nonsuperuser;
CREATE USER other_db_user9;
RESET ROLE;
\c regression

View File

@ -59,7 +59,7 @@ ORDER BY 1;
function citus_internal.commit_management_command_2pc()
function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.mark_object_distributed(oid,text,oid)
function citus_internal.mark_object_distributed(oid,text,oid,text)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
function citus_internal.pg_dist_shard_placement_trigger_func()

View File

@ -51,6 +51,7 @@ SET ROLE nonsuperuser;
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
\c other_db1
SET ROLE nonsuperuser;
CREATE USER other_db_user9;
RESET ROLE;