Merge branch 'main' into 7244/multi_db_connection_improvements

pull/7286/head
Jelte Fennema-Nio 2024-03-21 10:18:01 +01:00 committed by GitHub
commit 46ec3fe4ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 666 additions and 467 deletions

View File

@ -0,0 +1,351 @@
/*-------------------------------------------------------------------------
*
* non_main_db_distribute_object_ops.c
*
* Routines to support node-wide object management commands from non-main
* databases.
*
* RunPreprocessNonMainDBCommand and RunPostprocessNonMainDBCommand are
* the entrypoints for this module. These functions are called from
* utility_hook.c to support some of the node-wide object management
* commands from non-main databases.
*
* To add support for a new command type, one needs to define a new
* NonMainDbDistributeObjectOps object within OperationArray. Also, if
* the command requires marking or unmarking some objects as distributed,
* the necessary operations can be implemented in
* RunPreprocessNonMainDBCommand and RunPostprocessNonMainDBCommand.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "catalog/pg_authid_d.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_transaction.h"
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
#define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
#define UNMARK_OBJECT_DISTRIBUTED \
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d, %s)"
/*
* NonMainDbDistributeObjectOps contains the necessary callbacks / flags to
* support node-wide object management commands from non-main databases.
*
* cannotBeExecutedInTransaction:
* Indicates whether the statement cannot be executed in a transaction. If
* this is set to true, the statement will be executed directly on the main
* database because there are no transactional visibility issues for such
* commands.
*
* checkSupportedObjectType:
* Callback function that checks whether type of the object referred to by
* given statement is supported. Can be NULL if not applicable for the
* statement type.
*/
typedef struct NonMainDbDistributeObjectOps
{
bool cannotBeExecutedInTransaction;
bool (*checkSupportedObjectType)(Node *parsetree);
} NonMainDbDistributeObjectOps;
/*
* checkSupportedObjectType callbacks for OperationArray.
*/
static bool CreateDbStmtCheckSupportedObjectType(Node *node);
static bool DropDbStmtCheckSupportedObjectType(Node *node);
static bool GrantStmtCheckSupportedObjectType(Node *node);
static bool SecLabelStmtCheckSupportedObjectType(Node *node);
/*
* OperationArray that holds NonMainDbDistributeObjectOps for different command types.
*/
static const NonMainDbDistributeObjectOps *const OperationArray[] = {
[T_CreateRoleStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = false,
.checkSupportedObjectType = NULL
},
[T_DropRoleStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = false,
.checkSupportedObjectType = NULL
},
[T_AlterRoleStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = false,
.checkSupportedObjectType = NULL
},
[T_GrantRoleStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = false,
.checkSupportedObjectType = NULL
},
[T_CreatedbStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = true,
.checkSupportedObjectType = CreateDbStmtCheckSupportedObjectType
},
[T_DropdbStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = true,
.checkSupportedObjectType = DropDbStmtCheckSupportedObjectType
},
[T_GrantStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = false,
.checkSupportedObjectType = GrantStmtCheckSupportedObjectType
},
[T_SecLabelStmt] = &(NonMainDbDistributeObjectOps) {
.cannotBeExecutedInTransaction = false,
.checkSupportedObjectType = SecLabelStmtCheckSupportedObjectType
},
};
/* other static function declarations */
const NonMainDbDistributeObjectOps * GetNonMainDbDistributeObjectOps(Node *parsetree);
static void CreateRoleStmtMarkDistGloballyOnMainDbs(CreateRoleStmt *createRoleStmt);
static void DropRoleStmtUnmarkDistOnLocalMainDb(DropRoleStmt *dropRoleStmt);
static void MarkObjectDistributedGloballyOnMainDbs(Oid catalogRelId, Oid objectId,
char *objectName);
static void UnmarkObjectDistributedOnLocalMainDb(uint16 catalogRelId, Oid objectId);
/*
* RunPreprocessNonMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility.
*
* Returns true if previous utility hook needs to be skipped after completing
* preprocess phase.
*/
bool
RunPreprocessNonMainDBCommand(Node *parsetree)
{
if (IsMainDB)
{
return false;
}
const NonMainDbDistributeObjectOps *ops = GetNonMainDbDistributeObjectOps(parsetree);
if (!ops)
{
return false;
}
char *queryString = DeparseTreeNode(parsetree);
/*
* For the commands that cannot be executed in a transaction, there are no
* transactional visibility issues. We directly route them to main database
* so that we only have to consider one code-path for such commands.
*/
if (ops->cannotBeExecutedInTransaction)
{
IsMainDBCommandInXact = false;
RunCitusMainDBQuery((char *) queryString);
return true;
}
IsMainDBCommandInXact = true;
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION,
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);
mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
quote_literal_cstr(queryString),
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
if (IsA(parsetree, DropRoleStmt))
{
DropRoleStmtUnmarkDistOnLocalMainDb((DropRoleStmt *) parsetree);
}
return false;
}
/*
* RunPostprocessNonMainDBCommand runs the necessary commands for a query, in main
* database after query is run on the local node with PrevProcessUtility.
*/
void
RunPostprocessNonMainDBCommand(Node *parsetree)
{
if (IsMainDB || !GetNonMainDbDistributeObjectOps(parsetree))
{
return;
}
if (IsA(parsetree, CreateRoleStmt))
{
CreateRoleStmtMarkDistGloballyOnMainDbs((CreateRoleStmt *) parsetree);
}
}
/*
* GetNonMainDbDistributeObjectOps returns the NonMainDbDistributeObjectOps for given
* command if it's node-wide object management command that's supported from non-main
* databases.
*/
const NonMainDbDistributeObjectOps *
GetNonMainDbDistributeObjectOps(Node *parsetree)
{
NodeTag tag = nodeTag(parsetree);
if (tag >= lengthof(OperationArray))
{
return NULL;
}
const NonMainDbDistributeObjectOps *ops = OperationArray[tag];
if (ops == NULL)
{
return NULL;
}
if (!ops->checkSupportedObjectType ||
ops->checkSupportedObjectType(parsetree))
{
return ops;
}
return NULL;
}
/*
* CreateRoleStmtMarkDistGloballyOnMainDbs marks the role as
* distributed on all main databases globally.
*/
static void
CreateRoleStmtMarkDistGloballyOnMainDbs(CreateRoleStmt *createRoleStmt)
{
/* object must exist as we've just created it */
bool missingOk = false;
Oid roleId = get_role_oid(createRoleStmt->role, missingOk);
MarkObjectDistributedGloballyOnMainDbs(AuthIdRelationId, roleId,
createRoleStmt->role);
}
/*
* DropRoleStmtUnmarkDistOnLocalMainDb unmarks the roles as
* distributed on the local main database.
*/
static void
DropRoleStmtUnmarkDistOnLocalMainDb(DropRoleStmt *dropRoleStmt)
{
RoleSpec *roleSpec = NULL;
foreach_ptr(roleSpec, dropRoleStmt->roles)
{
Oid roleOid = get_role_oid(roleSpec->rolename,
dropRoleStmt->missing_ok);
if (roleOid == InvalidOid)
{
continue;
}
UnmarkObjectDistributedOnLocalMainDb(AuthIdRelationId, roleOid);
}
}
/*
* MarkObjectDistributedGloballyOnMainDbs marks an object as
* distributed on all main databases globally.
*/
static void
MarkObjectDistributedGloballyOnMainDbs(Oid catalogRelId, Oid objectId, char *objectName)
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
catalogRelId,
quote_literal_cstr(objectName),
objectId,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
/*
* UnmarkObjectDistributedOnLocalMainDb unmarks an object as
* distributed on the local main database.
*/
static void
UnmarkObjectDistributedOnLocalMainDb(uint16 catalogRelId, Oid objectId)
{
const int subObjectId = 0;
const char *checkObjectExistence = "false";
StringInfo query = makeStringInfo();
appendStringInfo(query,
UNMARK_OBJECT_DISTRIBUTED,
catalogRelId, objectId,
subObjectId, checkObjectExistence);
RunCitusMainDBQuery(query->data);
}
/*
* checkSupportedObjectTypes callbacks for OperationArray lie below.
*/
static bool
CreateDbStmtCheckSupportedObjectType(Node *node)
{
/*
* We don't try to send the query to the main database if the CREATE
* DATABASE command is for the main database itself, this is a very
* rare case but it's exercised by our test suite.
*/
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
return strcmp(stmt->dbname, MainDb) != 0;
}
static bool
DropDbStmtCheckSupportedObjectType(Node *node)
{
/*
* We don't try to send the query to the main database if the DROP
* DATABASE command is for the main database itself, this is a very
* rare case but it's exercised by our test suite.
*/
DropdbStmt *stmt = castNode(DropdbStmt, node);
return strcmp(stmt->dbname, MainDb) != 0;
}
static bool
GrantStmtCheckSupportedObjectType(Node *node)
{
GrantStmt *stmt = castNode(GrantStmt, node);
return stmt->objtype == OBJECT_DATABASE;
}
static bool
SecLabelStmtCheckSupportedObjectType(Node *node)
{
SecLabelStmt *stmt = castNode(SecLabelStmt, node);
return stmt->objtype == OBJECT_ROLE;
}

View File

@ -491,18 +491,17 @@ GenerateRoleOptionsList(HeapTuple tuple)
options = lappend(options, makeDefElem("password", NULL, -1));
}
/* load valid unitl data from the heap tuple, use default of infinity if not set */
/* load valid until data from the heap tuple */
Datum rolValidUntilDatum = SysCacheGetAttr(AUTHNAME, tuple,
Anum_pg_authid_rolvaliduntil, &isNull);
char *rolValidUntil = "infinity";
if (!isNull)
{
rolValidUntil = pstrdup((char *) timestamptz_to_str(rolValidUntilDatum));
}
char *rolValidUntil = pstrdup((char *) timestamptz_to_str(rolValidUntilDatum));
Node *validUntilStringNode = (Node *) makeString(rolValidUntil);
DefElem *validUntilOption = makeDefElem("validUntil", validUntilStringNode, -1);
options = lappend(options, validUntilOption);
Node *validUntilStringNode = (Node *) makeString(rolValidUntil);
DefElem *validUntilOption = makeDefElem("validUntil", validUntilStringNode, -1);
options = lappend(options, validUntilOption);
}
return options;
}

View File

@ -3053,11 +3053,15 @@ ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *command,
else if (constraint->contype == CONSTR_FOREIGN)
{
RangeVar *referencedTable = constraint->pktable;
char *referencedColumn = strVal(lfirst(list_head(constraint->pk_attrs)));
Oid referencedRelationId = RangeVarGetRelid(referencedTable, NoLock, false);
appendStringInfo(errHint, "FOREIGN KEY (%s) REFERENCES %s(%s)", colName,
get_rel_name(referencedRelationId), referencedColumn);
appendStringInfo(errHint, "FOREIGN KEY (%s) REFERENCES %s", colName,
get_rel_name(referencedRelationId));
if (list_length(constraint->pk_attrs) > 0)
{
AppendColumnNameList(errHint, constraint->pk_attrs);
}
if (constraint->fk_del_action == FKCONSTR_ACTION_SETNULL)
{

View File

@ -87,69 +87,6 @@
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
#define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
#define UNMARK_OBJECT_DISTRIBUTED \
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d,%s)"
/* see NonMainDbDistributedStatementInfo for the explanation of these flags */
typedef enum DistObjectOperation
{
NO_DIST_OBJECT_OPERATION,
MARK_DISTRIBUTED_GLOBALLY,
UNMARK_DISTRIBUTED_LOCALLY
} DistObjectOperation;
/*
* NonMainDbDistributedStatementInfo is used to determine whether a statement is
* supported from non-main databases and whether it should be marked or unmarked
* as distributed.
*
* When creating a distributed object, we always have to mark such objects as
* "distributed" but while for some object types we can delegate this to main
* database, for some others we have to explicitly send a command to all nodes
* in this code-path to achieve this. Callers need to provide
* MARK_DISTRIBUTED_GLOBALLY when that is the case.
*
* Similarly when dropping a distributed object, we always have to unmark such
* objects as "distributed" and our utility hook on remote nodes achieve this
* via UnmarkNodeWideObjectsDistributed() because the commands that we send to
* workers are executed via main db. However for the local node, this is not the
* case as we're not in the main db. For this reason, callers need to provide
* UNMARK_DISTRIBUTED_LOCALLY to unmark an object for local node as well.
*/
typedef struct NonMainDbDistributedStatementInfo
{
int statementType;
DistObjectOperation DistObjectOperation;
/*
* checkSupportedObjectTypes is a callback function that checks whether
* type of the object referred to by given statement is supported.
*
* Can be NULL if not applicable for the statement type.
*/
bool (*checkSupportedObjectTypes)(Node *node);
} NonMainDbDistributedStatementInfo;
/*
* DistObjectOperationParams is used to pass parameters to the
* MarkObjectDistributedGloballyFromNonMainDb function and
* UnMarkObjectDistributedLocallyFromNonMainDb functions.
*/
typedef struct DistObjectOperationParams
{
char *name;
Oid id;
uint16 catalogRelId;
} DistObjectOperationParams;
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
@ -179,46 +116,6 @@ static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
/*
* Functions to support commands used to manage node-wide objects from non-main
* databases.
*/
static bool IsCommandToCreateOrDropMainDB(Node *parsetree);
static void RunPreprocessMainDBCommand(Node *parsetree);
static void RunPostprocessMainDBCommand(Node *parsetree);
static bool IsStatementSupportedFromNonMainDb(Node *parsetree);
static bool StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree);
static bool StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree);
static void MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree);
static void UnMarkObjectDistributedLocallyFromNonMainDb(List *unmarkDistributedList);
static List * GetDistObjectOperationParams(Node *parsetree);
/*
* checkSupportedObjectTypes callbacks for
* NonMainDbDistributedStatementInfo objects.
*/
static bool NonMainDbCheckSupportedObjectTypeForGrant(Node *node);
static bool NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node);
/*
* NonMainDbSupportedStatements is an array of statements that are supported
* from non-main databases.
*/
ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE };
static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = {
{ T_GrantRoleStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_CreateRoleStmt, MARK_DISTRIBUTED_GLOBALLY, NULL },
{ T_DropRoleStmt, UNMARK_DISTRIBUTED_LOCALLY, NULL },
{ T_AlterRoleStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_GrantStmt, NO_DIST_OBJECT_OPERATION, NonMainDbCheckSupportedObjectTypeForGrant },
{ T_CreatedbStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_DropdbStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_SecLabelStmt, NO_DIST_OBJECT_OPERATION,
NonMainDbCheckSupportedObjectTypeForSecLabel },
};
/*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
* pieces of a utility statement before invoking ProcessUtility.
@ -350,36 +247,25 @@ citus_ProcessUtility(PlannedStmt *pstmt,
if (!CitusHasBeenLoaded())
{
/*
* We always execute CREATE/DROP DATABASE from the main database. There are no
* transactional visibility issues, since these commands are non-transactional.
* And this way we only have to consider one codepath when creating databases.
* We don't try to send the query to the main database if the CREATE/DROP DATABASE
* command is for the main database itself, this is a very rare case but it's
* exercised by our test suite.
* Process the command via RunPreprocessNonMainDBCommand and
* RunPostprocessNonMainDBCommand hooks if we're in a non-main database
* and if the command is a node-wide object management command that we
* support from non-main databases.
*/
if (!IsMainDB &&
!IsCommandToCreateOrDropMainDB(parsetree))
{
RunPreprocessMainDBCommand(parsetree);
if (IsA(parsetree, CreatedbStmt) ||
IsA(parsetree, DropdbStmt))
{
return;
}
bool shouldSkipPrevUtilityHook = RunPreprocessNonMainDBCommand(parsetree);
if (!shouldSkipPrevUtilityHook)
{
/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
*/
PrevProcessUtility(pstmt, queryString, false, context,
params, queryEnv, dest, completionTag);
}
/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
*/
PrevProcessUtility(pstmt, queryString, false, context,
params, queryEnv, dest, completionTag);
if (!IsMainDB)
{
RunPostprocessMainDBCommand(parsetree);
}
RunPostprocessNonMainDBCommand(parsetree);
return;
}
@ -1715,281 +1601,3 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}
/*
* IsCommandToCreateOrDropMainDB checks if this query creates or drops the
* main database, so we can make an exception and not send this query to
* the main database.
*/
static bool
IsCommandToCreateOrDropMainDB(Node *parsetree)
{
if (IsA(parsetree, CreatedbStmt))
{
CreatedbStmt *createdbStmt = castNode(CreatedbStmt, parsetree);
return strcmp(createdbStmt->dbname, MainDb) == 0;
}
else if (IsA(parsetree, DropdbStmt))
{
DropdbStmt *dropdbStmt = castNode(DropdbStmt, parsetree);
return strcmp(dropdbStmt->dbname, MainDb) == 0;
}
return false;
}
/*
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility
*/
static void
RunPreprocessMainDBCommand(Node *parsetree)
{
if (!IsStatementSupportedFromNonMainDb(parsetree))
{
return;
}
char *queryString = DeparseTreeNode(parsetree);
if (IsA(parsetree, CreatedbStmt) ||
IsA(parsetree, DropdbStmt))
{
IsMainDBCommandInXact = false;
RunCitusMainDBQuery((char *) queryString);
return;
}
IsMainDBCommandInXact = true;
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION,
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);
mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
quote_literal_cstr(queryString),
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
if (StatementRequiresUnmarkDistributedLocallyFromNonMainDb(parsetree))
{
List *unmarkParams = GetDistObjectOperationParams(parsetree);
UnMarkObjectDistributedLocallyFromNonMainDb(unmarkParams);
}
}
/*
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
* database after query is run on the local node with PrevProcessUtility
*/
static void
RunPostprocessMainDBCommand(Node *parsetree)
{
if (IsStatementSupportedFromNonMainDb(parsetree) &&
StatementRequiresMarkDistributedGloballyFromNonMainDb(parsetree))
{
MarkObjectDistributedGloballyFromNonMainDb(parsetree);
}
}
/*
* IsStatementSupportedFromNonMainDb returns true if the statement is supported from a
* non-main database.
*/
static bool
IsStatementSupportedFromNonMainDb(Node *parsetree)
{
NodeTag type = nodeTag(parsetree);
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{
if (type != NonMainDbSupportedStatements[i].statementType)
{
continue;
}
return !NonMainDbSupportedStatements[i].checkSupportedObjectTypes ||
NonMainDbSupportedStatements[i].checkSupportedObjectTypes(parsetree);
}
return false;
}
/*
* StatementRequiresMarkDistributedGloballyFromNonMainDb returns true if the statement should be marked
* as distributed when executed from a non-main database.
*/
static bool
StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree)
{
NodeTag type = nodeTag(parsetree);
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{
if (type == NonMainDbSupportedStatements[i].statementType)
{
return NonMainDbSupportedStatements[i].DistObjectOperation ==
MARK_DISTRIBUTED_GLOBALLY;
}
}
return false;
}
/*
* StatementRequiresUnmarkDistributedLocallyFromNonMainDb returns true if the statement should be unmarked
* as distributed when executed from a non-main database.
*/
static bool
StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree)
{
NodeTag type = nodeTag(parsetree);
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{
if (type == NonMainDbSupportedStatements[i].statementType)
{
return NonMainDbSupportedStatements[i].DistObjectOperation ==
UNMARK_DISTRIBUTED_LOCALLY;
}
}
return false;
}
/*
* MarkObjectDistributedGloballyFromNonMainDb marks the given object as distributed on the
* non-main database.
*/
static void
MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree)
{
List *distObjectOperationParams =
GetDistObjectOperationParams(parsetree);
DistObjectOperationParams *distObjectOperationParam = NULL;
foreach_ptr(distObjectOperationParam, distObjectOperationParams)
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
distObjectOperationParam->catalogRelId,
quote_literal_cstr(distObjectOperationParam->name),
distObjectOperationParam->id,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
}
/*
* UnMarkObjectDistributedLocallyFromNonMainDb unmarks the given object as distributed on the
* non-main database.
*/
static void
UnMarkObjectDistributedLocallyFromNonMainDb(List *markObjectDistributedParamList)
{
DistObjectOperationParams *markObjectDistributedParam = NULL;
int subObjectId = 0;
char *checkObjectExistence = "false";
foreach_ptr(markObjectDistributedParam, markObjectDistributedParamList)
{
StringInfo query = makeStringInfo();
appendStringInfo(query,
UNMARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
markObjectDistributedParam->id,
subObjectId, checkObjectExistence);
RunCitusMainDBQuery(query->data);
}
}
/*
* GetDistObjectOperationParams returns DistObjectOperationParams for the target
* object of given parsetree.
*/
List *
GetDistObjectOperationParams(Node *parsetree)
{
List *paramsList = NIL;
if (IsA(parsetree, CreateRoleStmt))
{
CreateRoleStmt *stmt = castNode(CreateRoleStmt, parsetree);
DistObjectOperationParams *params =
(DistObjectOperationParams *) palloc(sizeof(DistObjectOperationParams));
params->name = stmt->role;
params->catalogRelId = AuthIdRelationId;
params->id = get_role_oid(stmt->role, false);
paramsList = lappend(paramsList, params);
}
else if (IsA(parsetree, DropRoleStmt))
{
DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree);
RoleSpec *roleSpec;
foreach_ptr(roleSpec, stmt->roles)
{
DistObjectOperationParams *params = (DistObjectOperationParams *) palloc(
sizeof(DistObjectOperationParams));
Oid roleOid = get_role_oid(roleSpec->rolename, true);
if (roleOid == InvalidOid)
{
continue;
}
params->id = roleOid;
params->name = roleSpec->rolename;
params->catalogRelId = AuthIdRelationId;
paramsList = lappend(paramsList, params);
}
}
else
{
elog(ERROR, "unsupported statement type");
}
return paramsList;
}
/*
* NonMainDbCheckSupportedObjectTypeForGrant implements checkSupportedObjectTypes
* callback for GrantStmt.
*/
static bool
NonMainDbCheckSupportedObjectTypeForGrant(Node *node)
{
GrantStmt *stmt = castNode(GrantStmt, node);
return stmt->objtype == OBJECT_DATABASE;
}
/*
* NonMainDbCheckSupportedObjectTypeForSecLabel implements checkSupportedObjectTypes
* callback for SecLabel.
*/
static bool
NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node)
{
SecLabelStmt *stmt = castNode(SecLabelStmt, node);
return stmt->objtype == OBJECT_ROLE;
}

View File

@ -121,7 +121,7 @@ AppendAlterTableStmt(StringInfo buf, AlterTableStmt *stmt)
* AppendColumnNameList converts a list of columns into comma separated string format
* (colname_1, colname_2, .., colname_n).
*/
static void
void
AppendColumnNameList(StringInfo buf, List *columns)
{
appendStringInfoString(buf, " (");

View File

@ -492,19 +492,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
bool
ClusterHasKnownMetadataWorkers()
{
bool workerWithMetadata = false;
if (!IsCoordinator())
{
workerWithMetadata = true;
}
if (workerWithMetadata || HasMetadataWorkers())
{
return true;
}
return false;
return !IsCoordinator() || HasMetadataWorkers();
}

View File

@ -91,6 +91,10 @@ bool InDelegatedFunctionCall = false;
static bool
contain_param_walker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Param))
{
Param *paramNode = (Param *) node;

View File

@ -50,6 +50,13 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
* so we are using first primary worker node just for test purposes.
*/
WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode();
if (dummyWorkerNode == NULL)
{
ereport(ERROR, (errmsg("no worker nodes found"),
errdetail("Function activate_node_snapshot is meant to be "
"used when running tests on a multi-node cluster "
"with workers.")));
}
/*
* Create MetadataSyncContext which is used throughout nodes' activation.

View File

@ -34,6 +34,7 @@
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/xid8.h"
#include "pg_version_constants.h"
@ -261,11 +262,28 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
continue;
}
/* Check if the transaction is created by an outer transaction from a non-main database */
bool outerXidIsNull = false;
Datum outerXidDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_outerxid,
tupleDescriptor, &outerXidIsNull);
Datum outerXidDatum = 0;
if (EnableVersionChecks ||
SearchSysCacheExistsAttName(DistTransactionRelationId(), "outer_xid"))
{
/* Check if the transaction is created by an outer transaction from a non-main database */
outerXidDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_outerxid,
tupleDescriptor, &outerXidIsNull);
}
else
{
/*
* Normally we don't try to recover prepared transactions when the
* binary version doesn't match the sql version. However, we skip
* those checks in regression tests by disabling
* citus.enable_version_checks. And when this is the case, while
* the C code looks for "outer_xid" attribute, pg_dist_transaction
* doesn't yet have it.
*/
Assert(!EnableVersionChecks);
}
TransactionId outerXid = 0;
if (!outerXidIsNull)

View File

@ -707,13 +707,27 @@ SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode)
}
List *replicatedShardList = NIL;
if (AnyTableReplicated(shardIntervalList, &replicatedShardList))
{
if (ClusterHasKnownMetadataWorkers() && !IsFirstWorkerNode())
{
LockShardListResourcesOnFirstWorker(lockMode, replicatedShardList);
}
bool anyTableReplicated = AnyTableReplicated(shardIntervalList, &replicatedShardList);
/*
* Acquire locks on the modified table.
* If the table is replicated, the locks are first acquired on the first worker node then locally.
* But if we're already on the first worker, acquiring on the first worker node and locally are the same operation.
* So we only acquire locally in that case.
*/
if (anyTableReplicated && ClusterHasKnownMetadataWorkers() && !IsFirstWorkerNode())
{
LockShardListResourcesOnFirstWorker(lockMode, replicatedShardList);
}
LockShardListResources(shardIntervalList, lockMode);
/*
* Next, acquire locks on the reference tables that are referenced by a foreign key if there are any.
* Note that LockReferencedReferenceShardResources() first acquires locks on the first worker,
* then locally.
*/
if (anyTableReplicated)
{
ShardInterval *firstShardInterval =
(ShardInterval *) linitial(replicatedShardList);
if (ReferenceTableShardId(firstShardInterval->shardId))
@ -728,8 +742,6 @@ SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode)
LockReferencedReferenceShardResources(firstShardInterval->shardId, lockMode);
}
}
LockShardListResources(shardIntervalList, lockMode);
}

View File

@ -104,6 +104,10 @@ typedef struct DistributeObjectOps
const DistributeObjectOps * GetDistributeObjectOps(Node *node);
/* functions to support node-wide object management commands from non-main dbs */
extern bool RunPreprocessNonMainDBCommand(Node *parsetree);
extern void RunPostprocessNonMainDBCommand(Node *parsetree);
/*
* Flags that can be passed to GetForeignKeyOids to indicate
* which foreign key constraint OIDs are to be extracted

View File

@ -121,6 +121,8 @@ extern void AppendGrantedByInGrant(StringInfo buf, GrantStmt *stmt);
extern void AppendGrantSharedPrefix(StringInfo buf, GrantStmt *stmt);
extern void AppendGrantSharedSuffix(StringInfo buf, GrantStmt *stmt);
extern void AppendColumnNameList(StringInfo buf, List *columns);
/* Common deparser utils */
typedef struct DefElemOptionFormat

View File

@ -44,6 +44,15 @@ ERROR: cannot execute ADD COLUMN command with PRIMARY KEY, UNIQUE, FOREIGN and
DETAIL: Adding a column with a constraint in one command is not supported because all constraints in Citus must have explicit names
HINT: You can issue each command separately such as ALTER TABLE referencing ADD COLUMN test_8 data_type; ALTER TABLE referencing ADD CONSTRAINT constraint_name CHECK (check_expression);
ALTER TABLE referencing ADD COLUMN test_8 integer CONSTRAINT check_test_8 CHECK (test_8 > 0);
-- error out properly even if the REFERENCES does not include the column list of the referenced table
ALTER TABLE referencing ADD COLUMN test_9 bool, ADD COLUMN test_10 int REFERENCES referenced;
ERROR: cannot execute ADD COLUMN command with PRIMARY KEY, UNIQUE, FOREIGN and CHECK constraints
DETAIL: Adding a column with a constraint in one command is not supported because all constraints in Citus must have explicit names
HINT: You can issue each command separately such as ALTER TABLE referencing ADD COLUMN test_10 data_type; ALTER TABLE referencing ADD CONSTRAINT constraint_name FOREIGN KEY (test_10) REFERENCES referenced;
ALTER TABLE referencing ADD COLUMN test_9 bool, ADD COLUMN test_10 int REFERENCES referenced(int_col);
ERROR: cannot execute ADD COLUMN command with PRIMARY KEY, UNIQUE, FOREIGN and CHECK constraints
DETAIL: Adding a column with a constraint in one command is not supported because all constraints in Citus must have explicit names
HINT: You can issue each command separately such as ALTER TABLE referencing ADD COLUMN test_10 data_type; ALTER TABLE referencing ADD CONSTRAINT constraint_name FOREIGN KEY (test_10) REFERENCES referenced (int_col );
-- try to add test_6 again, but with IF NOT EXISTS
ALTER TABLE referencing ADD COLUMN IF NOT EXISTS test_6 text;
NOTICE: column "test_6" of relation "referencing" already exists, skipping

View File

@ -121,17 +121,17 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin, rolreplication, rolbypassrls, rolconnlimit, (rolpassword != '') as pass_not_empty, rolvaliduntil FROM pg_authid WHERE rolname LIKE 'create\_%' ORDER BY rolname;
rolname | rolsuper | rolinherit | rolcreaterole | rolcreatedb | rolcanlogin | rolreplication | rolbypassrls | rolconnlimit | pass_not_empty | rolvaliduntil
---------------------------------------------------------------------
create_group | f | t | f | f | f | f | f | -1 | | infinity
create_group_2 | f | t | f | f | f | f | f | -1 | | infinity
create_role | f | t | f | f | f | f | f | -1 | | infinity
create_role"edge | f | t | f | f | f | f | f | -1 | | infinity
create_role'edge | f | t | f | f | f | f | f | -1 | | infinity
create_role_2 | f | t | f | f | f | f | f | -1 | | infinity
create_role_sysid | f | t | f | f | f | f | f | -1 | | infinity
create_group | f | t | f | f | f | f | f | -1 | |
create_group_2 | f | t | f | f | f | f | f | -1 | |
create_role | f | t | f | f | f | f | f | -1 | |
create_role"edge | f | t | f | f | f | f | f | -1 | |
create_role'edge | f | t | f | f | f | f | f | -1 | |
create_role_2 | f | t | f | f | f | f | f | -1 | |
create_role_sysid | f | t | f | f | f | f | f | -1 | |
create_role_with_everything | t | t | t | t | t | t | t | 105 | t | Thu May 04 17:00:00 2045 PDT
create_role_with_nothing | f | f | f | f | f | f | f | 3 | t | Mon May 04 17:00:00 2015 PDT
create_user | f | t | f | f | t | f | f | -1 | | infinity
create_user_2 | f | t | f | f | t | f | f | -1 | | infinity
create_user | f | t | f | f | t | f | f | -1 | |
create_user_2 | f | t | f | f | t | f | f | -1 | |
(11 rows)
SELECT roleid::regrole::text AS role, member::regrole::text, grantor::regrole::text, admin_option FROM pg_auth_members WHERE roleid::regrole::text LIKE 'create\_%' ORDER BY 1, 2;

View File

@ -0,0 +1,32 @@
CREATE SCHEMA function_with_case;
SET search_path TO function_with_case;
-- create function
CREATE OR REPLACE FUNCTION test_err(v1 text)
RETURNS text
LANGUAGE plpgsql
SECURITY DEFINER
AS $function$
begin
return v1 || ' - ok';
END;
$function$;
do $$ declare
lNewValues text;
val text;
begin
val = 'test';
lNewValues = test_err(v1 => case when val::text = 'test'::text then 'yes' else 'no' end);
raise notice 'lNewValues= %', lNewValues;
end;$$ ;
NOTICE: lNewValues= yes - ok
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
-- call function
SELECT test_err('test');
test_err
---------------------------------------------------------------------
test - ok
(1 row)
DROP SCHEMA function_with_case CASCADE;
NOTICE: drop cascades to function test_err(text)

View File

@ -0,0 +1,62 @@
--- Test for updating a table that has a foreign key reference to another reference table.
--- Issue #7477: Distributed deadlock after issuing a simple UPDATE statement
--- https://github.com/citusdata/citus/issues/7477
CREATE TABLE table1 (id INT PRIMARY KEY);
SELECT create_reference_table('table1');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1 VALUES (1);
CREATE TABLE table2 (
id INT,
info TEXT,
CONSTRAINT table1_id_fk FOREIGN KEY (id) REFERENCES table1 (id)
);
SELECT create_reference_table('table2');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table2 VALUES (1, 'test');
--- Runs the update command in parallel on workers.
--- Due to bug #7477, before the fix, the result is non-deterministic
--- and have several rows of the form:
--- localhost | 57638 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: canceling the transaction since it was involved in a distributed deadlock
SELECT * FROM master_run_on_worker(
ARRAY['localhost', 'localhost','localhost', 'localhost','localhost',
'localhost','localhost', 'localhost','localhost', 'localhost']::text[],
ARRAY[57638, 57637, 57637, 57638, 57637, 57638, 57637, 57638, 57638, 57637]::int[],
ARRAY['UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1'
]::text[],
true);
node_name | node_port | success | result
---------------------------------------------------------------------
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57638 | t | UPDATE 1
localhost | 57637 | t | UPDATE 1
(10 rows)
--- cleanup
DROP TABLE table2;
DROP TABLE table1;

View File

@ -188,7 +188,6 @@ select 1 from citus_add_node('localhost', :worker_2_port);
1
(1 row)
-- XXX: date is not correct on one of the workers due to https://github.com/citusdata/citus/issues/7533
select result FROM run_command_on_all_nodes($$
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (
@ -200,11 +199,11 @@ select result FROM run_command_on_all_nodes($$
ORDER BY rolname
) t
$$);
result
result
---------------------------------------------------------------------
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":true,"rolinherit":true,"rolcreaterole":true,"rolcreatedb":true,"rolcanlogin":true,"rolreplication":true,"rolbypassrls":true,"rolconnlimit":10,"pass_not_empty":null,"date":"2023-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":true,"rolinherit":true,"rolcreaterole":true,"rolcreatedb":true,"rolcanlogin":true,"rolreplication":true,"rolbypassrls":true,"rolconnlimit":10,"pass_not_empty":null,"date":"2023-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":"infinity"},{"rolname":"test_role2-needs\\!escape","rolsuper":true,"rolinherit":true,"rolcreaterole":true,"rolcreatedb":true,"rolcanlogin":true,"rolreplication":true,"rolbypassrls":true,"rolconnlimit":10,"pass_not_empty":null,"date":"2023-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":"infinity"}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":true,"rolinherit":true,"rolcreaterole":true,"rolcreatedb":true,"rolcanlogin":true,"rolreplication":true,"rolbypassrls":true,"rolconnlimit":10,"pass_not_empty":null,"date":"2023-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
(3 rows)
--test for alter user
@ -229,7 +228,6 @@ select 1 from citus_add_node('localhost', :worker_2_port);
1
(1 row)
-- XXX: date is not correct on one of the workers due to https://github.com/citusdata/citus/issues/7533
select result FROM run_command_on_all_nodes($$
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (
@ -241,11 +239,11 @@ select result FROM run_command_on_all_nodes($$
ORDER BY rolname
) t
$$);
result
result
---------------------------------------------------------------------
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":false,"rolinherit":false,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":5,"pass_not_empty":null,"date":"2024-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":false,"rolinherit":false,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":5,"pass_not_empty":null,"date":"2024-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":"infinity"},{"rolname":"test_role2-needs\\!escape","rolsuper":false,"rolinherit":false,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":5,"pass_not_empty":null,"date":"2024-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":"infinity"}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":false,"rolinherit":false,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":5,"pass_not_empty":null,"date":"2024-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
(3 rows)
--test for drop user
@ -266,7 +264,6 @@ select 1 from citus_add_node('localhost', :worker_2_port);
1
(1 row)
-- XXX: date is not correct on one of the workers due to https://github.com/citusdata/citus/issues/7533
select result FROM run_command_on_all_nodes($$
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (
@ -278,11 +275,11 @@ select result FROM run_command_on_all_nodes($$
ORDER BY rolname
) t
$$);
result
result
---------------------------------------------------------------------
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":"infinity"},{"rolname":"test_role2-needs\\!escape","rolsuper":false,"rolinherit":false,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":5,"pass_not_empty":null,"date":"2024-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":"infinity"}]
[{"rolname":"test_role1","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":true,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":true,"date":null},{"rolname":"test_role2-needs\\!escape","rolsuper":false,"rolinherit":false,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":5,"pass_not_empty":null,"date":"2024-01-01"},{"rolname":"test_role3","rolsuper":false,"rolinherit":true,"rolcreaterole":false,"rolcreatedb":false,"rolcanlogin":false,"rolreplication":false,"rolbypassrls":false,"rolconnlimit":-1,"pass_not_empty":null,"date":null}]
(3 rows)
-- Clean up: drop the database on worker node 2

View File

@ -167,9 +167,9 @@ SELECT node_type, result FROM get_citus_tests_label_provider_labels('"user 2"')
SET citus.log_remote_commands TO on;
SET citus.grep_remote_commands = '%SECURITY LABEL%';
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
NOTICE: issuing SELECT worker_create_or_alter_role('user1', 'CREATE ROLE user1 NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL VALID UNTIL ''infinity''', 'ALTER ROLE user1 NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL VALID UNTIL ''infinity''');SECURITY LABEL FOR "citus '!tests_label_provider" ON ROLE user1 IS 'citus_classified'
NOTICE: issuing SELECT worker_create_or_alter_role('user1', 'CREATE ROLE user1 NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL', 'ALTER ROLE user1 NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL');SECURITY LABEL FOR "citus '!tests_label_provider" ON ROLE user1 IS 'citus_classified'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_create_or_alter_role('user 2', 'CREATE ROLE "user 2" NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL VALID UNTIL ''infinity''', 'ALTER ROLE "user 2" NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL VALID UNTIL ''infinity''');SECURITY LABEL FOR "citus '!tests_label_provider" ON ROLE "user 2" IS 'citus ''!unclassified'
NOTICE: issuing SELECT worker_create_or_alter_role('user 2', 'CREATE ROLE "user 2" NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL', 'ALTER ROLE "user 2" NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT -1 PASSWORD NULL');SECURITY LABEL FOR "citus '!tests_label_provider" ON ROLE "user 2" IS 'citus ''!unclassified'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
?column?
---------------------------------------------------------------------

View File

@ -67,6 +67,20 @@ SELECT 1 FROM run_command_on_workers($$SELECT pg_reload_conf()$$);
1
(2 rows)
-- In the version that we use for upgrade tests (v10.2.0), we propagate
-- "valid until" to the workers as "infinity" even if it's not set. And
-- given that "postgres" role is created in the older version, "valid until"
-- is set to "infinity" on the workers while this is not the case for
-- coordinator. See https://github.com/citusdata/citus/issues/7533.
--
-- We're fixing this for new versions of Citus and we'll probably backport
-- this to some older versions too. However, v10.2.0 won't ever have this
-- fix.
--
-- For this reason, here we set "valid until" to "infinity" for all the
-- nodes so that below query doesn't report any difference between the
-- metadata on coordinator and workers.
ALTER ROLE postgres WITH VALID UNTIL 'infinity';
-- make sure that the metadata is consistent across all nodes
-- we exclude the distributed_object_data as they are
-- not sorted in the same order (as OIDs differ on the nodes)

View File

@ -103,13 +103,14 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol
test: alter_table_set_access_method
test: alter_distributed_table
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
test: background_task_queue_monitor
test: other_databases grant_role_from_non_maindb role_operations_from_non_maindb seclabel_non_maindb
test: citus_internal_access
test: function_with_case_when
# Causal clock test
test: clock

View File

@ -41,6 +41,10 @@ ALTER TABLE referencing ADD COLUMN "test_\'!7" "simple_!\'custom_type";
ALTER TABLE referencing ADD COLUMN test_8 integer CHECK (test_8 > 0);
ALTER TABLE referencing ADD COLUMN test_8 integer CONSTRAINT check_test_8 CHECK (test_8 > 0);
-- error out properly even if the REFERENCES does not include the column list of the referenced table
ALTER TABLE referencing ADD COLUMN test_9 bool, ADD COLUMN test_10 int REFERENCES referenced;
ALTER TABLE referencing ADD COLUMN test_9 bool, ADD COLUMN test_10 int REFERENCES referenced(int_col);
-- try to add test_6 again, but with IF NOT EXISTS
ALTER TABLE referencing ADD COLUMN IF NOT EXISTS test_6 text;
ALTER TABLE referencing ADD COLUMN IF NOT EXISTS test_6 integer;

View File

@ -0,0 +1,27 @@
CREATE SCHEMA function_with_case;
SET search_path TO function_with_case;
-- create function
CREATE OR REPLACE FUNCTION test_err(v1 text)
RETURNS text
LANGUAGE plpgsql
SECURITY DEFINER
AS $function$
begin
return v1 || ' - ok';
END;
$function$;
do $$ declare
lNewValues text;
val text;
begin
val = 'test';
lNewValues = test_err(v1 => case when val::text = 'test'::text then 'yes' else 'no' end);
raise notice 'lNewValues= %', lNewValues;
end;$$ ;
-- call function
SELECT test_err('test');
DROP SCHEMA function_with_case CASCADE;

View File

@ -0,0 +1,44 @@
--- Test for updating a table that has a foreign key reference to another reference table.
--- Issue #7477: Distributed deadlock after issuing a simple UPDATE statement
--- https://github.com/citusdata/citus/issues/7477
CREATE TABLE table1 (id INT PRIMARY KEY);
SELECT create_reference_table('table1');
INSERT INTO table1 VALUES (1);
CREATE TABLE table2 (
id INT,
info TEXT,
CONSTRAINT table1_id_fk FOREIGN KEY (id) REFERENCES table1 (id)
);
SELECT create_reference_table('table2');
INSERT INTO table2 VALUES (1, 'test');
--- Runs the update command in parallel on workers.
--- Due to bug #7477, before the fix, the result is non-deterministic
--- and have several rows of the form:
--- localhost | 57638 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: deadlock detected
--- localhost | 57637 | f | ERROR: canceling the transaction since it was involved in a distributed deadlock
SELECT * FROM master_run_on_worker(
ARRAY['localhost', 'localhost','localhost', 'localhost','localhost',
'localhost','localhost', 'localhost','localhost', 'localhost']::text[],
ARRAY[57638, 57637, 57637, 57638, 57637, 57638, 57637, 57638, 57638, 57637]::int[],
ARRAY['UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1',
'UPDATE table2 SET info = ''test_update'' WHERE id = 1'
]::text[],
true);
--- cleanup
DROP TABLE table2;
DROP TABLE table1;

View File

@ -100,7 +100,6 @@ create role test_role3;
\c regression - - :master_port
select 1 from citus_add_node('localhost', :worker_2_port);
-- XXX: date is not correct on one of the workers due to https://github.com/citusdata/citus/issues/7533
select result FROM run_command_on_all_nodes($$
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (
@ -128,7 +127,6 @@ LIMIT 5 VALID UNTIL '2024-01-01';
\c regression - - :master_port
select 1 from citus_add_node('localhost', :worker_2_port);
-- XXX: date is not correct on one of the workers due to https://github.com/citusdata/citus/issues/7533
select result FROM run_command_on_all_nodes($$
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (
@ -153,7 +151,6 @@ DROP ROLE test_role3;
\c regression - - :master_port
select 1 from citus_add_node('localhost', :worker_2_port);
-- XXX: date is not correct on one of the workers due to https://github.com/citusdata/citus/issues/7533
select result FROM run_command_on_all_nodes($$
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (

View File

@ -27,6 +27,21 @@ SET datestyle = "ISO, YMD";
SELECT 1 FROM run_command_on_workers($$ALTER SYSTEM SET datestyle = "ISO, YMD";$$);
SELECT 1 FROM run_command_on_workers($$SELECT pg_reload_conf()$$);
-- In the version that we use for upgrade tests (v10.2.0), we propagate
-- "valid until" to the workers as "infinity" even if it's not set. And
-- given that "postgres" role is created in the older version, "valid until"
-- is set to "infinity" on the workers while this is not the case for
-- coordinator. See https://github.com/citusdata/citus/issues/7533.
--
-- We're fixing this for new versions of Citus and we'll probably backport
-- this to some older versions too. However, v10.2.0 won't ever have this
-- fix.
--
-- For this reason, here we set "valid until" to "infinity" for all the
-- nodes so that below query doesn't report any difference between the
-- metadata on coordinator and workers.
ALTER ROLE postgres WITH VALID UNTIL 'infinity';
-- make sure that the metadata is consistent across all nodes
-- we exclude the distributed_object_data as they are
-- not sorted in the same order (as OIDs differ on the nodes)