Merge branch 'main' into grant_parameter_propagation

grant_parameter_propagation
Gürkan İndibay 2024-01-23 12:38:26 +03:00 committed by GitHub
commit f4bec903b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 814 additions and 325 deletions

View File

@ -61,6 +61,7 @@ check-style:
# depend on install-all so that downgrade scripts are installed as well
check: all install-all
$(MAKE) -C src/test/regress check-full
# explicetely does not use $(MAKE) to avoid parallelism
make -C src/test/regress check
.PHONY: all check clean install install-downgrades install-all

View File

@ -209,12 +209,9 @@ static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommand
static bool HasAnyGeneratedStoredColumns(Oid relationId);
static List * GetNonGeneratedStoredColumnNameList(Oid relationId);
static void CheckAlterDistributedTableConversionParameters(TableConversionState *con);
static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName,
char *sequenceName,
char *sourceSchemaName,
char *sourceName,
char *targetSchemaName,
char *targetName);
static char * CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName,
char *qualifiedSourceName,
char *qualifiedTargetName);
static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid);
static char * CreateMaterializedViewDDLCommand(Oid matViewOid);
static char * GetAccessMethodForMatViewIfExists(Oid viewOid);
@ -791,13 +788,15 @@ ConvertTableInternal(TableConversionState *con)
justBeforeDropCommands = lappend(justBeforeDropCommands, detachFromParentCommand);
}
char *qualifiedRelationName = quote_qualified_identifier(con->schemaName,
con->relationName);
if (PartitionedTable(con->relationId))
{
if (!con->suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("converting the partitions of %s",
quote_qualified_identifier(con->schemaName,
con->relationName))));
qualifiedRelationName)));
}
List *partitionList = PartitionList(con->relationId);
@ -870,9 +869,7 @@ ConvertTableInternal(TableConversionState *con)
if (!con->suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("creating a new table for %s",
quote_qualified_identifier(con->schemaName,
con->relationName))));
ereport(NOTICE, (errmsg("creating a new table for %s", qualifiedRelationName)));
}
TableDDLCommand *tableCreationCommand = NULL;
@ -999,8 +996,6 @@ ConvertTableInternal(TableConversionState *con)
{
continue;
}
char *qualifiedRelationName = quote_qualified_identifier(con->schemaName,
con->relationName);
TableConversionParameters cascadeParam = {
.relationId = colocatedTableId,
@ -1750,9 +1745,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
{
StringInfo query = makeStringInfo();
char *viewName = get_rel_name(matViewOid);
char *schemaName = get_namespace_name(get_rel_namespace(matViewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
char *qualifiedViewName = generate_qualified_relation_name(matViewOid);
/* here we need to get the access method of the view to recreate it */
char *accessMethodName = GetAccessMethodForMatViewIfExists(matViewOid);
@ -1801,9 +1794,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
bool suppressNoticeMessages)
{
char *sourceName = get_rel_name(sourceId);
char *targetName = get_rel_name(targetId);
Oid schemaId = get_rel_namespace(sourceId);
char *schemaName = get_namespace_name(schemaId);
char *qualifiedSourceName = generate_qualified_relation_name(sourceId);
char *qualifiedTargetName = generate_qualified_relation_name(targetId);
StringInfo query = makeStringInfo();
@ -1811,8 +1803,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
{
if (!suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("moving the data of %s",
quote_qualified_identifier(schemaName, sourceName))));
ereport(NOTICE, (errmsg("moving the data of %s", qualifiedSourceName)));
}
if (!HasAnyGeneratedStoredColumns(sourceId))
@ -1822,8 +1813,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
* "INSERT INTO .. SELECT *"".
*/
appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s",
quote_qualified_identifier(schemaName, targetName),
quote_qualified_identifier(schemaName, sourceName));
qualifiedTargetName, qualifiedSourceName);
}
else
{
@ -1838,9 +1828,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
char *insertColumnString = StringJoin(nonStoredColumnNameList, ',');
appendStringInfo(query,
"INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s",
quote_qualified_identifier(schemaName, targetName),
insertColumnString, insertColumnString,
quote_qualified_identifier(schemaName, sourceName));
qualifiedTargetName, insertColumnString,
insertColumnString, qualifiedSourceName);
}
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
@ -1864,14 +1853,11 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
*/
if (ShouldSyncTableMetadata(targetId))
{
Oid sequenceSchemaOid = get_rel_namespace(sequenceOid);
char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid);
char *sequenceName = get_rel_name(sequenceOid);
char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid);
char *workerChangeSequenceDependencyCommand =
CreateWorkerChangeSequenceDependencyCommand(sequenceSchemaName,
sequenceName,
schemaName, sourceName,
schemaName, targetName);
CreateWorkerChangeSequenceDependencyCommand(qualifiedSequenceName,
qualifiedSourceName,
qualifiedTargetName);
SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand);
}
else if (ShouldSyncTableMetadata(sourceId))
@ -1894,25 +1880,23 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
if (!suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("dropping the old %s",
quote_qualified_identifier(schemaName, sourceName))));
ereport(NOTICE, (errmsg("dropping the old %s", qualifiedSourceName)));
}
resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
quote_qualified_identifier(schemaName, sourceName));
qualifiedSourceName);
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
if (!suppressNoticeMessages)
{
ereport(NOTICE, (errmsg("renaming the new table to %s",
quote_qualified_identifier(schemaName, sourceName))));
ereport(NOTICE, (errmsg("renaming the new table to %s", qualifiedSourceName)));
}
resetStringInfo(query);
appendStringInfo(query, "ALTER TABLE %s RENAME TO %s",
quote_qualified_identifier(schemaName, targetName),
qualifiedTargetName,
quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
}
@ -2172,18 +2156,13 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con)
* worker_change_sequence_dependency query with the parameters.
*/
static char *
CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName,
char *sourceSchemaName, char *sourceName,
char *targetSchemaName, char *targetName)
CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequeceName,
char *qualifiedSourceName,
char *qualifiedTargetName)
{
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
sequenceName);
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
quote_literal_cstr(qualifiedSchemaName),
quote_literal_cstr(qualifiedSequeceName),
quote_literal_cstr(qualifiedSourceName),
quote_literal_cstr(qualifiedTargetName));

View File

@ -1160,9 +1160,7 @@ DropIdentitiesOnTable(Oid relationId)
if (attributeForm->attidentity)
{
char *tableName = get_rel_name(relationId);
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
char *qualifiedTableName = generate_qualified_relation_name(relationId);
StringInfo dropCommand = makeStringInfo();
@ -1222,9 +1220,7 @@ DropViewsOnTable(Oid relationId)
Oid viewId = InvalidOid;
foreach_oid(viewId, reverseOrderedViews)
{
char *viewName = get_rel_name(viewId);
char *schemaName = get_namespace_name(get_rel_namespace(viewId));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
char *qualifiedViewName = generate_qualified_relation_name(viewId);
StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "DROP %sVIEW IF EXISTS %s",

View File

@ -0,0 +1,131 @@
/*-------------------------------------------------------------------------
*
* comment.c
* Commands to interact with the comments for all database
* object types.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "catalog/pg_shdescription.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "distributed/comment.h"
static char * GetCommentForObject(Oid classOid, Oid objectOid);
List *
GetCommentPropagationCommands(Oid classOid, Oid objOoid, char *objectName, ObjectType
objectType)
{
List *commands = NIL;
StringInfo commentStmt = makeStringInfo();
/* Get the comment for the database */
char *comment = GetCommentForObject(classOid, objOoid);
char const *commentObjectType = ObjectTypeNames[objectType];
/* Create the SQL command to propagate the comment to other nodes */
if (comment != NULL)
{
appendStringInfo(commentStmt, "COMMENT ON %s %s IS %s;", commentObjectType,
quote_identifier(objectName),
quote_literal_cstr(comment));
}
/* Add the command to the list */
if (commentStmt->len > 0)
{
commands = list_make1(commentStmt->data);
}
return commands;
}
static char *
GetCommentForObject(Oid classOid, Oid objectOid)
{
HeapTuple tuple;
char *comment = NULL;
/* Open pg_shdescription catalog */
Relation shdescRelation = table_open(SharedDescriptionRelationId, AccessShareLock);
/* Scan the table */
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0],
Anum_pg_shdescription_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(objectOid));
ScanKeyInit(&scanKey[1],
Anum_pg_shdescription_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classOid));
bool indexOk = true;
int scanKeyCount = 2;
SysScanDesc scan = systable_beginscan(shdescRelation, SharedDescriptionObjIndexId,
indexOk, NULL, scanKeyCount,
scanKey);
if ((tuple = systable_getnext(scan)) != NULL)
{
bool isNull = false;
TupleDesc tupdesc = RelationGetDescr(shdescRelation);
Datum descDatum = heap_getattr(tuple, Anum_pg_shdescription_description, tupdesc,
&isNull);
/* Add the command to the list */
if (!isNull)
{
comment = TextDatumGetCString(descDatum);
}
else
{
comment = NULL;
}
}
/* End the scan and close the catalog */
systable_endscan(scan);
table_close(shdescRelation, AccessShareLock);
return comment;
}
/*
* CommentObjectAddress resolves the ObjectAddress for the object
* on which the comment is placed. Optionally errors if the object does not
* exist based on the missing_ok flag passed in by the caller.
*/
List *
CommentObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Relation relation;
ObjectAddress objectAddress = get_object_address(stmt->objtype, stmt->object,
&relation, AccessExclusiveLock,
missing_ok);
ObjectAddress *objectAddressCopy = palloc0(sizeof(ObjectAddress));
*objectAddressCopy = objectAddress;
return list_make1(objectAddressCopy);
}

View File

@ -1323,10 +1323,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
List *partitionList = PartitionList(relationId);
Oid partitionRelationId = InvalidOid;
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
char *parentRelationName = generate_qualified_relation_name(relationId);
/*
* when there are many partitions, each call to CreateDistributedTable

View File

@ -13,8 +13,10 @@
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_collation.h"
@ -25,6 +27,7 @@
#include "commands/defrem.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
@ -33,6 +36,7 @@
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/comment.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
@ -45,7 +49,6 @@
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
@ -672,6 +675,31 @@ GetTablespaceName(Oid tablespaceOid)
}
/*
* GetDatabaseMetadataSyncCommands returns a list of sql statements
* for the given database id. The list contains the database ddl command,
* grant commands and comment propagation commands.
*/
List *
GetDatabaseMetadataSyncCommands(Oid dbOid)
{
char *databaseName = get_database_name(dbOid);
char *databaseDDLCommand = CreateDatabaseDDLCommand(dbOid);
List *ddlCommands = list_make1(databaseDDLCommand);
List *grantDDLCommands = GrantOnDatabaseDDLCommands(dbOid);
List *commentDDLCommands = GetCommentPropagationCommands(DatabaseRelationId, dbOid,
databaseName,
OBJECT_DATABASE);
ddlCommands = list_concat(ddlCommands, grantDDLCommands);
ddlCommands = list_concat(ddlCommands, commentDDLCommands);
return ddlCommands;
}
/*
* GetDatabaseCollation gets oid of a database and returns all the collation related information
* We need this method since collation related info in Form_pg_database is not accessible.

View File

@ -584,15 +584,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
*/
if (dependency->objectId != MyDatabaseId && EnableCreateDatabasePropagation)
{
char *databaseDDLCommand = CreateDatabaseDDLCommand(dependency->objectId);
List *ddlCommands = list_make1(databaseDDLCommand);
List *grantDDLCommands = GrantOnDatabaseDDLCommands(dependency->objectId);
ddlCommands = list_concat(ddlCommands, grantDDLCommands);
return ddlCommands;
return GetDatabaseMetadataSyncCommands(dependency->objectId);
}
return NIL;

View File

@ -16,6 +16,7 @@
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/comment.h"
#include "distributed/deparser.h"
#include "distributed/version_compat.h"
@ -304,6 +305,17 @@ static DistributeObjectOps Any_DropRole = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Role_Comment = {
.deparse = DeparseCommentStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = CommentObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Any_CreateForeignServer = {
.deparse = DeparseCreateForeignServerStmt,
.qualify = NULL,
@ -533,6 +545,17 @@ static DistributeObjectOps Database_Set = {
.markDistributed = false,
};
static DistributeObjectOps Database_Comment = {
.deparse = DeparseCommentStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = CommentObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Database_Rename = {
.deparse = DeparseAlterDatabaseRenameStmt,
.qualify = NULL,
@ -986,13 +1009,18 @@ static DistributeObjectOps TextSearchConfig_AlterOwner = {
.markDistributed = false,
};
static DistributeObjectOps TextSearchConfig_Comment = {
.deparse = DeparseTextSearchConfigurationCommentStmt,
.deparse = DeparseCommentStmt,
/* TODO: When adding new comment types we should create an abstracted
* qualify function, just like we have an abstract deparse
* and adress function
*/
.qualify = QualifyTextSearchConfigurationCommentStmt,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL,
.objectType = OBJECT_TSCONFIGURATION,
.operationType = DIST_OPS_ALTER,
.address = TextSearchConfigurationCommentObjectAddress,
.address = CommentObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps TextSearchConfig_Define = {
@ -1055,13 +1083,13 @@ static DistributeObjectOps TextSearchDict_AlterOwner = {
.markDistributed = false,
};
static DistributeObjectOps TextSearchDict_Comment = {
.deparse = DeparseTextSearchDictionaryCommentStmt,
.deparse = DeparseCommentStmt,
.qualify = QualifyTextSearchDictionaryCommentStmt,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL,
.objectType = OBJECT_TSDICTIONARY,
.operationType = DIST_OPS_ALTER,
.address = TextSearchDictCommentObjectAddress,
.address = CommentObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps TextSearchDict_Define = {
@ -1794,6 +1822,16 @@ GetDistributeObjectOps(Node *node)
return &TextSearchDict_Comment;
}
case OBJECT_DATABASE:
{
return &Database_Comment;
}
case OBJECT_ROLE:
{
return &Role_Comment;
}
default:
{
return &NoDistributeOps;

View File

@ -2547,12 +2547,8 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
if (columnNulls[partitionColumnIndex])
{
Oid relationId = copyDest->distributedRelationId;
char *relationName = get_rel_name(relationId);
Oid schemaOid = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaOid);
char *qualifiedTableName = quote_qualified_identifier(schemaName,
relationName);
char *qualifiedTableName = generate_qualified_relation_name(
copyDest->distributedRelationId);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column of table %s cannot be NULL",

View File

@ -45,6 +45,7 @@
#include "distributed/citus_safe_lib.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/comment.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
@ -582,6 +583,17 @@ GenerateCreateOrAlterRoleCommand(Oid roleOid)
{
completeRoleList = lappend(completeRoleList, DeparseTreeNode(stmt));
}
/*
* append COMMENT ON ROLE commands for this specific user
* When we propagate user creation, we also want to make sure that we propagate
* all the comments it has been given. For this, we check pg_shdescription
* for the ROLE entry corresponding to roleOid, and generate the relevant
* Comment stmts to be run in the new node.
*/
List *commentStmts = GetCommentPropagationCommands(AuthIdRelationId, roleOid,
rolename, OBJECT_ROLE);
completeRoleList = list_concat(completeRoleList, commentStmts);
}
return completeRoleList;

View File

@ -790,45 +790,6 @@ AlterTextSearchDictionarySchemaStmtObjectAddress(Node *node, bool missing_ok, bo
}
/*
* TextSearchConfigurationCommentObjectAddress resolves the ObjectAddress for the TEXT
* SEARCH CONFIGURATION on which the comment is placed. Optionally errors if the
* configuration does not exist based on the missing_ok flag passed in by the caller.
*/
List *
TextSearchConfigurationCommentObjectAddress(Node *node, bool missing_ok, bool
isPostprocess)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Assert(stmt->objtype == OBJECT_TSCONFIGURATION);
Oid objid = get_ts_config_oid(castNode(List, stmt->object), missing_ok);
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, TSConfigRelationId, objid);
return list_make1(address);
}
/*
* TextSearchDictCommentObjectAddress resolves the ObjectAddress for the TEXT SEARCH
* DICTIONARY on which the comment is placed. Optionally errors if the dictionary does not
* exist based on the missing_ok flag passed in by the caller.
*/
List *
TextSearchDictCommentObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Assert(stmt->objtype == OBJECT_TSDICTIONARY);
Oid objid = get_ts_dict_oid(castNode(List, stmt->object), missing_ok);
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, TSDictionaryRelationId, objid);
return list_make1(address);
}
/*
* AlterTextSearchConfigurationOwnerObjectAddress resolves the ObjectAddress for the TEXT
* SEARCH CONFIGURATION for which the owner is changed. Optionally errors if the

View File

@ -392,9 +392,7 @@ CreateViewDDLCommand(Oid viewOid)
static void
AppendQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid)
{
char *viewName = get_rel_name(viewOid);
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
char *qualifiedViewName = generate_qualified_relation_name(viewOid);
appendStringInfo(buf, "%s ", qualifiedViewName);
}

View File

@ -0,0 +1,77 @@
/*-------------------------------------------------------------------------
*
* deparse_coment_stmts.c
*
* All routines to deparse comment statements.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "pg_version_compat.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/comment.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/log_utils.h"
const char *ObjectTypeNames[] =
{
[OBJECT_DATABASE] = "DATABASE",
[OBJECT_ROLE] = "ROLE",
[OBJECT_TSCONFIGURATION] = "TEXT SEARCH CONFIGURATION",
[OBJECT_TSDICTIONARY] = "TEXT SEARCH DICTIONARY",
/* When support for propagating comments to new objects is introduced, an entry for each
* statement type should be added to this list. The first element in each entry is the 'object_type' keyword
* that will be included in the 'COMMENT ON <object_type> ..' statement (i.e. DATABASE,). The second element is the type of
* stmt->object, which represents the name of the propagated object.
*/
};
char *
DeparseCommentStmt(Node *node)
{
CommentStmt *stmt = castNode(CommentStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
const char *objectName = NULL;
if (IsA(stmt->object, String))
{
objectName = quote_identifier(strVal(stmt->object));
}
else if (IsA(stmt->object, List))
{
objectName = NameListToQuotedString(castNode(List, stmt->object));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unknown object type")));
}
const char *objectType = ObjectTypeNames[stmt->objtype];
char *comment = stmt->comment != NULL ? quote_literal_cstr(stmt->comment) : "NULL";
appendStringInfo(&str, "COMMENT ON %s %s IS %s;", objectType, objectName, comment);
return str.data;
}

View File

@ -395,68 +395,6 @@ DeparseAlterTextSearchDictionarySchemaStmt(Node *node)
}
/*
* DeparseTextSearchConfigurationCommentStmt returns the sql statement representing
* COMMENT ON TEXT SEARCH CONFIGURATION ... IS ...
*/
char *
DeparseTextSearchConfigurationCommentStmt(Node *node)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Assert(stmt->objtype == OBJECT_TSCONFIGURATION);
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf, "COMMENT ON TEXT SEARCH CONFIGURATION %s IS ",
NameListToQuotedString(castNode(List, stmt->object)));
if (stmt->comment == NULL)
{
appendStringInfoString(&buf, "NULL");
}
else
{
appendStringInfoString(&buf, quote_literal_cstr(stmt->comment));
}
appendStringInfoString(&buf, ";");
return buf.data;
}
/*
* DeparseTextSearchDictionaryCommentStmt returns the sql statement representing
* COMMENT ON TEXT SEARCH DICTIONARY ... IS ...
*/
char *
DeparseTextSearchDictionaryCommentStmt(Node *node)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Assert(stmt->objtype == OBJECT_TSDICTIONARY);
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf, "COMMENT ON TEXT SEARCH DICTIONARY %s IS ",
NameListToQuotedString(castNode(List, stmt->object)));
if (stmt->comment == NULL)
{
appendStringInfoString(&buf, "NULL");
}
else
{
appendStringInfoString(&buf, quote_literal_cstr(stmt->comment));
}
appendStringInfoString(&buf, ";");
return buf.data;
}
/*
* AppendStringInfoTokentypeList specializes in adding a comma separated list of
* token_tyoe's to TEXT SEARCH CONFIGURATION commands

View File

@ -401,7 +401,7 @@ typedef struct WorkerPool
/*
* Placement executions destined for worker node, but not assigned to any
* connection and not ready to start.
* connection and ready to start.
*/
dlist_head readyTaskQueue;
int readyTaskCount;
@ -492,8 +492,6 @@ typedef struct WorkerSession
} WorkerSession;
struct TaskPlacementExecution;
/* GUC, determining whether Citus opens 1 connection per task */
bool ForceMaxQueryParallelization = false;
int MaxAdaptiveExecutorPoolSize = 16;
@ -585,7 +583,7 @@ typedef enum TaskPlacementExecutionState
} TaskPlacementExecutionState;
/*
* TaskPlacementExecution represents the an execution of a command
* TaskPlacementExecution represents the execution of a command
* on a shard placement.
*/
typedef struct TaskPlacementExecution
@ -1908,7 +1906,7 @@ RunDistributedExecution(DistributedExecution *execution)
/*
* Iterate until all the tasks are finished. Once all the tasks
* are finished, ensure that that all the connection initializations
* are finished, ensure that all the connection initializations
* are also finished. Otherwise, those connections are terminated
* abruptly before they are established (or failed). Instead, we let
* the ConnectionStateMachine() to properly handle them.
@ -3118,7 +3116,7 @@ ConnectionStateMachine(WorkerSession *session)
*
* We can only retry connection when the remote transaction has
* not started over the connection. Otherwise, we'd have to deal
* with restoring the transaction state, which iis beyond our
* with restoring the transaction state, which is beyond our
* purpose at this time.
*/
RemoteTransaction *transaction = &connection->remoteTransaction;

View File

@ -143,15 +143,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
targetRelation->partitionColumn);
if (distributionColumnIndex == -1)
{
char *relationName = get_rel_name(targetRelationId);
Oid schemaOid = get_rel_namespace(targetRelationId);
char *schemaName = get_namespace_name(schemaOid);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg(
"the partition column of table %s should have a value",
quote_qualified_identifier(schemaName,
relationName))));
generate_qualified_relation_name(targetRelationId))));
}
TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList,

View File

@ -168,7 +168,7 @@ CitusExecutorRun(QueryDesc *queryDesc,
executorBoundParams = queryDesc->params;
/*
* We do some potentially time consuming operations our self now before we hand of
* We do some potentially time consuming operations ourself now before we hand off
* control to postgres' executor. To make sure that time spent is accurately measured
* we remove the totaltime instrumentation from the queryDesc. Instead we will start
* and stop the instrumentation of the total time and put it back on the queryDesc

View File

@ -1945,11 +1945,7 @@ ConstructQualifiedShardName(ShardInterval *shardInterval)
static List *
RecreateTableDDLCommandList(Oid relationId)
{
const char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
const char *relationSchemaName = get_namespace_name(relationSchemaId);
const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName,
relationName);
const char *qualifiedRelationName = generate_qualified_relation_name(relationId);
StringInfo dropCommand = makeStringInfo();

View File

@ -252,7 +252,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
/*
* Distribution column must be used in a simple equality match check and it must be
* place at top level conjustion operator. In simple words, we should have
* place at top level conjunction operator. In simple words, we should have
* WHERE dist_key = VALUE [AND ....];
*
* We're also not allowing any other appearances of the distribution key in the quals.

View File

@ -197,9 +197,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
if (!ExplainDistributedQueries)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str, "explain statements for distributed queries ");
appendStringInfo(es->str, "are not enabled\n");
ExplainPropertyBool("citus.explain_distributed_queries", false, es);
return;
}

View File

@ -715,8 +715,8 @@ MultiNodeTree(Query *queryTree)
/*
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains
* a call to the read_intermediate_result function.
* ContainsReadIntermediateResultFunction determines whether an expression tree
* contains a call to the read_intermediate_result function.
*/
bool
ContainsReadIntermediateResultFunction(Node *node)
@ -726,7 +726,7 @@ ContainsReadIntermediateResultFunction(Node *node)
/*
* ContainsReadIntermediateResultArrayFunction determines whether an expresion
* ContainsReadIntermediateResultArrayFunction determines whether an expression
* tree contains a call to the read_intermediate_results(result_ids, format)
* function.
*/

View File

@ -434,7 +434,7 @@ ExtractSelectRangeTableEntry(Query *query)
* for the given modification query.
*
* The function errors out if the input query is not a
* modify query (e.g., INSERT, UPDATE or DELETE). So, this
* modify query (e.g., INSERT, UPDATE, DELETE or MERGE). So, this
* function is not expected to be called on SELECT queries.
*/
Oid
@ -2271,13 +2271,13 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
/*
* RouterQuery runs router pruning logic for SELECT, UPDATE, DELETE, and MERGE queries.
* If there are shards present and query is routable, all RTEs have been updated
* to point to the relevant shards in the originalQuery. Also, placementList is
* filled with the list of worker nodes that has all the required shard placements
* for the query execution. anchorShardId is set to the first pruned shardId of
* the given query. Finally, relationShardList is filled with the list of
* relation-to-shard mappings for the query.
* PlanRouterQuery runs router pruning logic for SELECT, UPDATE, DELETE, and
* MERGE queries. If there are shards present and query is routable, all RTEs
* have been updated to point to the relevant shards in the originalQuery. Also,
* placementList is filled with the list of worker nodes that has all the
* required shard placements for the query execution. anchorShardId is set to
* the first pruned shardId of the given query. Finally, relationShardList is
* filled with the list of relation-to-shard mappings for the query.
*
* If the given query is not routable, it fills planningError with the related
* DeferredErrorMessage. The caller can check this error message to see if query
@ -2510,7 +2510,7 @@ AllShardsColocated(List *relationShardList)
if (currentTableType == RANGE_DISTRIBUTED ||
currentTableType == APPEND_DISTRIBUTED)
{
/* we do not have further strict colocation chceks */
/* we do not have further strict colocation checks */
continue;
}
}
@ -2932,7 +2932,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
}
/*
* Different resrictions might have different partition columns.
* Different restrictions might have different partition columns.
* We report partition column value if there is only one.
*/
if (multiplePartitionValuesExist)

View File

@ -170,14 +170,10 @@ WorkerDropDistributedTable(Oid relationId)
*/
if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL))
{
char *relName = get_rel_name(relationId);
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "DROP%sTABLE %s CASCADE",
IsForeignTable(relationId) ? " FOREIGN " : " ",
quote_qualified_identifier(schemaName, relName));
generate_qualified_relation_name(relationId));
Node *dropCommandNode = ParseTreeNode(dropCommand->data);

View File

@ -92,38 +92,21 @@ CitusNodeTagI(Node *node)
return ((CitusNode*)(node))->citus_tag;
}
/*
* Postgres's nodes/nodes.h has more information on why we do this.
*/
#ifdef __GNUC__
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ CitusNode *_result; \
AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \
_result = (CitusNode *) palloc0fast(size); \
_result->extensible.type = T_ExtensibleNode; \
_result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \
_result->citus_tag =(int) (tag); \
_result; \
})
static inline CitusNode *
CitusNewNode(size_t size, CitusNodeTag tag)
{
CitusNode *result;
#else
extern CitusNode *newCitusNodeMacroHolder;
#define CitusNewNode(size, tag) \
( \
AssertMacro((size) >= sizeof(CitusNode)), /* need the tag, at least */ \
newCitusNodeMacroHolder = (CitusNode *) palloc0fast(size), \
newCitusNodeMacroHolder->extensible.type = T_ExtensibleNode, \
newCitusNodeMacroHolder->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START], \
newCitusNodeMacroHolder->citus_tag =(int) (tag), \
newCitusNodeMacroHolder \
)
#endif
Assert(size >= sizeof(CitusNode)); /* need the ExtensibleNode and the tag, at least */
result = (CitusNode *) palloc0(size);
result->extensible.type = T_ExtensibleNode;
result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START];
result->citus_tag = (int) (tag);
return result;
}
/*
* IsA equivalent that compares node tags, including Citus-specific nodes.

View File

@ -230,6 +230,7 @@ extern List * PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * GetDatabaseMetadataSyncCommands(Oid dbOid);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
@ -699,11 +700,6 @@ extern List * AlterTextSearchConfigurationSchemaStmtObjectAddress(Node *node,
extern List * AlterTextSearchDictionarySchemaStmtObjectAddress(Node *node,
bool missing_ok, bool
isPostprocess);
extern List * TextSearchConfigurationCommentObjectAddress(Node *node,
bool missing_ok, bool
isPostprocess);
extern List * TextSearchDictCommentObjectAddress(Node *node,
bool missing_ok, bool isPostprocess);
extern List * AlterTextSearchConfigurationOwnerObjectAddress(Node *node,
bool missing_ok, bool
isPostprocess);

View File

@ -0,0 +1,26 @@
/*-------------------------------------------------------------------------
*
* comment.h
* Declarations for comment related operations.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef COMMENT_H
#define COMMENT_H
#include "postgres.h"
#include "nodes/parsenodes.h"
extern const char *ObjectTypeNames[];
extern List * GetCommentPropagationCommands(Oid classOid, Oid oid, char *objectName,
ObjectType objectType);
extern List * CommentObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
# endif /* COMMENT_H */

View File

@ -143,6 +143,9 @@ extern void DefElemOptionToStatement(StringInfo buf, DefElem *option,
const DefElemOptionFormat *opt_formats,
int opt_formats_len);
/* forward declarations for deparse_comment_stmts.c */
extern char * DeparseCommentStmt(Node *node);
/* forward declarations for deparse_statistics_stmts.c */
extern char * DeparseCreateStatisticsStmt(Node *node);

View File

@ -238,8 +238,8 @@ typedef struct Task
TaskQuery taskQuery;
/*
* A task can have multiple queries, in which case queryCount will be > 1. If
* a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST.
* A task can have multiple queries, in which case queryCount will be > 1, and
* taskQuery->queryType == TASK_QUERY_TEXT_LIST.
*/
int queryCount;
@ -290,7 +290,7 @@ typedef struct Task
/*
* When we evaluate functions and parameters in the query string then
* we should no longer send the list of parameters long with the
* we should no longer send the list of parameters along with the
* query.
*/
bool parametersInQueryStringResolved;

View File

@ -215,7 +215,7 @@ s/^(ERROR: The index name \(test_index_creation1_p2020_09_26)_([0-9])+_(tenant_
s/^(DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26)_([0-9])+_(tenant_id_timeperiod_idx)/\1_xxxxxx_\3/g
# normalize errors for not being able to connect to a non-existing host
s/could not translate host name "foobar" to address: .*$/could not translate host name "foobar" to address: <system specific error>/g
s/could not translate host name "([A-Za-z0-9\.\-]+)" to address: .*$/could not translate host name "\1" to address: <system specific error>/g
# ignore PL/pgSQL line numbers that differ on Mac builds
s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g

View File

@ -153,6 +153,9 @@ DEPS = {
"isolation_extension_commands": TestDeps(
None, ["isolation_setup", "isolation_add_remove_node"]
),
"isolation_update_node": TestDeps(
None, ["isolation_setup", "isolation_add_remove_node"]
),
"schema_based_sharding": TestDeps("minimal_schedule"),
"multi_sequence_default": TestDeps(
None, ["multi_test_helpers", "multi_cluster_management", "multi_table_ddl"]

View File

@ -82,6 +82,7 @@ the name of the fixture. For example:
```python
def test_some_query(cluster):
cluster.coordinator.sql("SELECT 1")
assert cluster.workers[0].sql_value('SELECT 2') == 2
```
If you need a cluster of a specific size you can use the `cluster_factory`

View File

@ -0,0 +1,101 @@
set citus.log_remote_commands to on;
set citus.enable_create_database_propagation to on;
set citus.grep_remote_commands to 'COMMENT ON DATABASE';
create database "test1-\!escape";
comment on DATABASE "test1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
test-comment
(3 rows)
comment on DATABASE "test1-\!escape" is 'comment-needs\!escape';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
comment-needs\!escape
comment-needs\!escape
comment-needs\!escape
(3 rows)
comment on DATABASE "test1-\!escape" is null;
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
(3 rows)
drop DATABASE "test1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
create database "test1-\!escape";
comment on DATABASE "test1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
(2 rows)
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
test-comment
(3 rows)
drop DATABASE "test1-\!escape";
reset citus.enable_create_database_propagation;
reset citus.grep_remote_commands;
reset citus.log_remote_commands;

View File

@ -0,0 +1,99 @@
set citus.log_remote_commands to on;
set citus.grep_remote_commands to 'COMMENT ON ROLE';
create role "role1-\!escape";
comment on ROLE "role1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
test-comment
(3 rows)
comment on role "role1-\!escape" is 'comment-needs\!escape';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
comment-needs\!escape
comment-needs\!escape
comment-needs\!escape
(3 rows)
comment on role "role1-\!escape" is NULL;
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
(3 rows)
drop role "role1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
create role "role1-\!escape";
comment on ROLE "role1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
(2 rows)
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
test-comment
(3 rows)
drop role "role1-\!escape";
reset citus.grep_remote_commands;
reset citus.log_remote_commands;

View File

@ -93,8 +93,8 @@ nodeid|nodename|nodeport
starting permutation: s1-begin s1-update-node-1 s2-begin s2-update-node-1 s1-commit s2-abort s1-show-nodes s3-update-node-1-back s3-manually-fix-metadata
nodeid|nodename |nodeport
---------------------------------------------------------------------
25|localhost| 57638
24|localhost| 57637
23|localhost| 57638
22|localhost| 57637
(2 rows)
step s1-begin:
@ -139,8 +139,8 @@ step s1-show-nodes:
nodeid|nodename |nodeport|isactive
---------------------------------------------------------------------
25|localhost| 57638|t
24|localhost| 58637|t
23|localhost| 57638|t
22|localhost| 58637|t
(2 rows)
step s3-update-node-1-back:
@ -178,8 +178,8 @@ nodeid|nodename|nodeport
starting permutation: s2-create-table s1-begin s1-update-node-nonexistent s1-prepare-transaction s2-cache-prepared-statement s1-commit-prepared s2-execute-prepared s1-update-node-existent s3-manually-fix-metadata
nodeid|nodename |nodeport
---------------------------------------------------------------------
27|localhost| 57638
26|localhost| 57637
23|localhost| 57638
22|localhost| 57637
(2 rows)
step s2-create-table:
@ -250,7 +250,7 @@ count
step s1-commit-prepared:
COMMIT prepared 'label';
s2: WARNING: connection to the remote node non-existent:57637 failed with the following error: could not translate host name "non-existent" to address: Name or service not known
s2: WARNING: connection to the remote node non-existent:57637 failed with the following error: could not translate host name "non-existent" to address: <system specific error>
step s2-execute-prepared:
EXECUTE foo;

View File

@ -80,7 +80,7 @@ DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1]
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(2 rows)
SET client_min_messages TO LOG;
@ -96,7 +96,7 @@ LOG: join order: [ "lineitem" ][ local partition join "orders" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
EXPLAIN (COSTS OFF)
@ -111,7 +111,7 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem_hash" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Verify we handle local joins between two hash-partitioned tables.
@ -123,7 +123,7 @@ LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Validate that we can handle broadcast joins with hash-partitioned tables.
@ -135,7 +135,7 @@ LOG: join order: [ "customer_hash" ][ reference join "nation" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Validate that we don't use a single-partition join method for a hash
@ -148,7 +148,7 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Validate that we don't chose a single-partition join method with a
@ -161,7 +161,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Validate that we can re-partition a hash partitioned table to join with a
@ -174,7 +174,7 @@ LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Validate a 4 way join that could be done locally is planned as such by the logical
@ -199,7 +199,7 @@ LOG: join order: [ "users_table" ][ local partition join "events_table" ][ loca
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Reset client logging level to its previous value

View File

@ -26,7 +26,7 @@ LOG: join order: [ "lineitem" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Query #3 from the TPC-H decision support benchmark
@ -61,7 +61,7 @@ LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partitio
-> HashAggregate
Group Key: remote_scan.l_orderkey, remote_scan.o_orderdate, remote_scan.o_shippriority
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(6 rows)
-- Query #10 from the TPC-H decision support benchmark
@ -103,7 +103,7 @@ LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partitio
Sort
Sort Key: remote_scan.revenue DESC
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(4 rows)
-- Query #19 from the TPC-H decision support benchmark (modified)
@ -142,7 +142,7 @@ LOG: join order: [ "lineitem" ][ dual partition join "part_append" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Query to test multiple re-partition jobs in a single query
@ -163,7 +163,7 @@ LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partitio
HashAggregate
Group Key: remote_scan.l_partkey
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(4 rows)
-- Reset client logging level to its previous value

View File

@ -21,7 +21,7 @@ LOG: join order: [ "lineitem" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Query #3 from the TPC-H decision support benchmark
@ -54,7 +54,7 @@ LOG: join order: [ "orders" ][ reference join "customer" ][ local partition joi
Sort
Sort Key: remote_scan.revenue DESC, remote_scan.o_orderdate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(4 rows)
-- Query #10 from the TPC-H decision support benchmark
@ -98,7 +98,7 @@ LOG: join order: [ "orders" ][ reference join "customer" ][ reference join "nat
-> HashAggregate
Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(6 rows)
-- Query #19 from the TPC-H decision support benchmark (modified)
@ -137,7 +137,7 @@ LOG: join order: [ "lineitem" ][ reference join "part" ]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Reset client logging level to its previous value

View File

@ -108,7 +108,7 @@ DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AM
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
EXPLAIN (COSTS OFF)
@ -122,7 +122,7 @@ DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)]
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Test that large table joins on partition varchar columns work
@ -137,7 +137,7 @@ DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
SET client_min_messages TO WARNING;

View File

@ -79,7 +79,7 @@ DEBUG: assigned task to node localhost:xxxxx
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
@ -93,7 +93,7 @@ DEBUG: assigned task to node localhost:xxxxx
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
-- Next test the first-replica task assignment policy
@ -109,7 +109,7 @@ DEBUG: assigned task to node localhost:xxxxx
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
@ -123,7 +123,7 @@ DEBUG: assigned task to node localhost:xxxxx
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(3 rows)
COMMIT;
@ -145,7 +145,7 @@ DEBUG: Creating router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
@ -154,7 +154,7 @@ DEBUG: Creating router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(2 rows)
SET LOCAL citus.task_assignment_policy TO 'first-replica';
@ -164,7 +164,7 @@ DEBUG: Creating router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
@ -173,7 +173,7 @@ DEBUG: Creating router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled
citus.explain_distributed_queries: false
(2 rows)
ROLLBACK;

View File

@ -39,6 +39,9 @@ test: create_drop_database_propagation
test: create_drop_database_propagation_pg15
test: create_drop_database_propagation_pg16
test: grant_on_parameter_propagation
test: comment_on_database
test: comment_on_role
# don't parallelize single_shard_table_udfs to make sure colocation ids are sequential
test: single_shard_table_udfs
test: schema_based_sharding

View File

@ -3,6 +3,8 @@ setup
-- revert back to pg_isolation_test_session_is_blocked until the tests are fixed
SELECT citus_internal.restore_isolation_tester_func();
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 22;
SELECT 1 FROM master_add_node('localhost', 57637);
SELECT 1 FROM master_add_node('localhost', 57638);

View File

@ -0,0 +1,73 @@
set citus.log_remote_commands to on;
set citus.enable_create_database_propagation to on;
set citus.grep_remote_commands to 'COMMENT ON DATABASE';
create database "test1-\!escape";
comment on DATABASE "test1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
comment on DATABASE "test1-\!escape" is 'comment-needs\!escape';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
comment on DATABASE "test1-\!escape" is null;
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
drop DATABASE "test1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
create database "test1-\!escape";
comment on DATABASE "test1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
select 1 from citus_add_node('localhost', :worker_2_port);
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
drop DATABASE "test1-\!escape";
reset citus.enable_create_database_propagation;
reset citus.grep_remote_commands;
reset citus.log_remote_commands;

View File

@ -0,0 +1,72 @@
set citus.log_remote_commands to on;
set citus.grep_remote_commands to 'COMMENT ON ROLE';
create role "role1-\!escape";
comment on ROLE "role1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
comment on role "role1-\!escape" is 'comment-needs\!escape';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
comment on role "role1-\!escape" is NULL;
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
drop role "role1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
create role "role1-\!escape";
comment on ROLE "role1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
select 1 from citus_add_node('localhost', :worker_2_port);
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
drop role "role1-\!escape";
reset citus.grep_remote_commands;
reset citus.log_remote_commands;