Merge remote-tracking branch 'upstream/main' into issue/6694

issue/6694
Gokhan Gulbiz 2023-03-30 09:38:16 +03:00
commit 24db425288
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
61 changed files with 3657 additions and 99 deletions

View File

@ -217,6 +217,7 @@ static bool WillRecreateForeignKeyToReferenceTable(Oid relationId,
CascadeToColocatedOption cascadeOption);
static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId);
static void ErrorIfUnsupportedCascadeObjects(Oid relationId);
static List * WrapTableDDLCommands(List *commandStrings);
static bool DoesCascadeDropUnsupportedObject(Oid classId, Oid id, HTAB *nodeMap);
static TableConversionReturn * CopyTableConversionReturnIntoCurrentContext(
TableConversionReturn *tableConversionReturn);
@ -604,9 +605,18 @@ ConvertTableInternal(TableConversionState *con)
List *justBeforeDropCommands = NIL;
List *attachPartitionCommands = NIL;
postLoadCommands =
list_concat(postLoadCommands,
GetViewCreationTableDDLCommandsOfTable(con->relationId));
List *createViewCommands = GetViewCreationCommandsOfTable(con->relationId);
postLoadCommands = list_concat(postLoadCommands,
WrapTableDDLCommands(createViewCommands));
/* need to add back to publications after dropping the original table */
bool isAdd = true;
List *alterPublicationCommands =
GetAlterPublicationDDLCommandsForTable(con->relationId, isAdd);
postLoadCommands = list_concat(postLoadCommands,
WrapTableDDLCommands(alterPublicationCommands));
List *foreignKeyCommands = NIL;
if (con->conversionType == ALTER_DISTRIBUTED_TABLE)
@ -1500,17 +1510,16 @@ GetViewCreationCommandsOfTable(Oid relationId)
/*
* GetViewCreationTableDDLCommandsOfTable is the same as GetViewCreationCommandsOfTable,
* but the returned list includes objects of TableDDLCommand's, not strings.
* WrapTableDDLCommands takes a list of command strings and wraps them
* in TableDDLCommand structs.
*/
List *
GetViewCreationTableDDLCommandsOfTable(Oid relationId)
static List *
WrapTableDDLCommands(List *commandStrings)
{
List *commands = GetViewCreationCommandsOfTable(relationId);
List *tableDDLCommands = NIL;
char *command = NULL;
foreach_ptr(command, commands)
foreach_ptr(command, commandStrings)
{
tableDDLCommands = lappend(tableDDLCommands, makeTableDDLCommandString(command));
}

View File

@ -85,6 +85,7 @@ static void DropRelationTruncateTriggers(Oid relationId);
static char * GetDropTriggerCommand(Oid relationId, char *triggerName);
static void DropViewsOnTable(Oid relationId);
static void DropIdentitiesOnTable(Oid relationId);
static void DropTableFromPublications(Oid relationId);
static List * GetRenameStatsCommandList(List *statsOidList, uint64 shardId);
static List * ReversedOidList(List *oidList);
static void AppendExplicitIndexIdsToList(Form_pg_index indexForm,
@ -338,6 +339,10 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
List *shellTableDDLEvents = GetShellTableDDLEventsForCitusLocalTable(relationId);
List *tableViewCreationCommands = GetViewCreationCommandsOfTable(relationId);
bool isAdd = true;
List *alterPublicationCommands =
GetAlterPublicationDDLCommandsForTable(relationId, isAdd);
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
@ -347,6 +352,12 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
*/
DropIdentitiesOnTable(relationId);
/*
* We do not want the shard to be in the publication (subscribers are
* unlikely to recognize it).
*/
DropTableFromPublications(relationId);
/* below we convert relation with relationId to the shard relation */
uint64 shardId = ConvertLocalTableToShard(relationId);
@ -363,6 +374,11 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
*/
ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(tableViewCreationCommands);
/*
* Execute the publication creation commands with the shell table.
*/
ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(alterPublicationCommands);
/*
* Set shellRelationId as the relation with relationId now points
* to the shard relation.
@ -1171,6 +1187,21 @@ DropIdentitiesOnTable(Oid relationId)
}
/*
* DropTableFromPublications drops the table from all of its publications.
*/
static void
DropTableFromPublications(Oid relationId)
{
bool isAdd = false;
List *alterPublicationCommands =
GetAlterPublicationDDLCommandsForTable(relationId, isAdd);
ExecuteAndLogUtilityCommandList(alterPublicationCommands);
}
/*
* DropViewsOnTable drops the views that depend on the given relation.
*/

View File

@ -438,6 +438,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return DDLCommands;
}
case OCLASS_PUBLICATION:
{
return CreatePublicationDDLCommandsIdempotent(dependency);
}
case OCLASS_ROLE:
{
return GenerateCreateOrAlterRoleCommand(dependency->objectId);

View File

@ -245,6 +245,15 @@ static DistributeObjectOps Any_CreatePolicy = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_CreatePublication = {
.deparse = DeparseCreatePublicationStmt,
.qualify = QualifyCreatePublicationStmt,
.preprocess = NULL,
.postprocess = PostProcessCreatePublicationStmt,
.operationType = DIST_OPS_CREATE,
.address = CreatePublicationStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_CreateRole = {
.deparse = DeparseCreateRoleStmt,
.qualify = NULL,
@ -707,6 +716,45 @@ static DistributeObjectOps Procedure_Rename = {
.address = RenameFunctionStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Publication_Alter = {
.deparse = DeparseAlterPublicationStmt,
.qualify = QualifyAlterPublicationStmt,
.preprocess = PreprocessAlterPublicationStmt,
.postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_PUBLICATION,
.operationType = DIST_OPS_ALTER,
.address = AlterPublicationStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Publication_AlterOwner = {
.deparse = DeparseAlterPublicationOwnerStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_PUBLICATION,
.operationType = DIST_OPS_ALTER,
.address = AlterPublicationOwnerStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Publication_Drop = {
.deparse = DeparseDropPublicationStmt,
.qualify = NULL,
.preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL,
.operationType = DIST_OPS_DROP,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Publication_Rename = {
.deparse = DeparseRenamePublicationStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL,
.objectType = OBJECT_PUBLICATION,
.operationType = DIST_OPS_ALTER,
.address = RenamePublicationStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Routine_AlterObjectDepends = {
.deparse = DeparseAlterFunctionDependsStmt,
.qualify = QualifyAlterFunctionDependsStmt,
@ -1399,6 +1447,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_AlterOwner;
}
case OBJECT_PUBLICATION:
{
return &Publication_AlterOwner;
}
case OBJECT_ROUTINE:
{
return &Routine_AlterOwner;
@ -1436,6 +1489,11 @@ GetDistributeObjectOps(Node *node)
return &Any_AlterPolicy;
}
case T_AlterPublicationStmt:
{
return &Publication_Alter;
}
case T_AlterRoleStmt:
{
return &Any_AlterRole;
@ -1610,6 +1668,11 @@ GetDistributeObjectOps(Node *node)
return &Any_CreatePolicy;
}
case T_CreatePublicationStmt:
{
return &Any_CreatePublication;
}
case T_CreateRoleStmt:
{
return &Any_CreateRole;
@ -1722,6 +1785,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_Drop;
}
case OBJECT_PUBLICATION:
{
return &Publication_Drop;
}
case OBJECT_ROUTINE:
{
return &Routine_Drop;
@ -1901,6 +1969,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_Rename;
}
case OBJECT_PUBLICATION:
{
return &Publication_Rename;
}
case OBJECT_ROUTINE:
{
return &Routine_Rename;

View File

@ -0,0 +1,634 @@
/*-------------------------------------------------------------------------
*
* publication.c
* Commands for creating publications
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/reference_table_utils.h"
#include "distributed/worker_create_or_replace.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "pg_version_compat.h"
static CreatePublicationStmt * BuildCreatePublicationStmt(Oid publicationId);
#if (PG_VERSION_NUM >= PG_VERSION_15)
static PublicationObjSpec * BuildPublicationRelationObjSpec(Oid relationId,
Oid publicationId,
bool tableOnly);
#endif
static void AppendPublishOptionList(StringInfo str, List *strings);
static char * AlterPublicationOwnerCommand(Oid publicationId);
static bool ShouldPropagateCreatePublication(CreatePublicationStmt *stmt);
static List * ObjectAddressForPublicationName(char *publicationName, bool missingOk);
/*
* PostProcessCreatePublicationStmt handles CREATE PUBLICATION statements
* that contain distributed tables.
*/
List *
PostProcessCreatePublicationStmt(Node *node, const char *queryString)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
if (!ShouldPropagateCreatePublication(stmt))
{
/* should not propagate right now */
return NIL;
}
/* call into CreatePublicationStmtObjectAddress */
List *publicationAddresses = GetObjectAddressListFromParseTree(node, false, true);
/* the code-path only supports a single object */
Assert(list_length(publicationAddresses) == 1);
if (IsAnyObjectAddressOwnedByExtension(publicationAddresses, NULL))
{
/* should not propagate publications owned by extensions */
return NIL;
}
EnsureAllObjectDependenciesExistOnAllNodes(publicationAddresses);
const ObjectAddress *pubAddress = linitial(publicationAddresses);
List *commands = NIL;
commands = lappend(commands, DISABLE_DDL_PROPAGATION);
commands = lappend(commands, CreatePublicationDDLCommand(pubAddress->objectId));
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* CreatePublicationDDLCommandsIdempotent returns a list of DDL statements to be
* executed on a node to recreate the publication addressed by the publicationAddress.
*/
List *
CreatePublicationDDLCommandsIdempotent(const ObjectAddress *publicationAddress)
{
Assert(publicationAddress->classId == PublicationRelationId);
char *ddlCommand =
CreatePublicationDDLCommand(publicationAddress->objectId);
char *alterPublicationOwnerSQL =
AlterPublicationOwnerCommand(publicationAddress->objectId);
return list_make2(
WrapCreateOrReplace(ddlCommand),
alterPublicationOwnerSQL);
}
/*
* CreatePublicationDDLCommand returns the CREATE PUBLICATION string that
* can be used to recreate a given publication.
*/
char *
CreatePublicationDDLCommand(Oid publicationId)
{
CreatePublicationStmt *createPubStmt = BuildCreatePublicationStmt(publicationId);
/* we took the WHERE clause from the catalog where it is already transformed */
bool whereClauseRequiresTransform = false;
/* only propagate Citus tables in publication */
bool includeLocalTables = false;
return DeparseCreatePublicationStmtExtended((Node *) createPubStmt,
whereClauseRequiresTransform,
includeLocalTables);
}
/*
* BuildCreatePublicationStmt constructs a CreatePublicationStmt struct for the
* given publication.
*/
static CreatePublicationStmt *
BuildCreatePublicationStmt(Oid publicationId)
{
CreatePublicationStmt *createPubStmt = makeNode(CreatePublicationStmt);
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(publicationTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d", publicationId)));
}
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
/* CREATE PUBLICATION <name> */
createPubStmt->pubname = pstrdup(NameStr(publicationForm->pubname));
/* FOR ALL TABLES */
createPubStmt->for_all_tables = publicationForm->puballtables;
ReleaseSysCache(publicationTuple);
#if (PG_VERSION_NUM >= PG_VERSION_15)
List *schemaIds = GetPublicationSchemas(publicationId);
Oid schemaId = InvalidOid;
foreach_oid(schemaId, schemaIds)
{
char *schemaName = get_namespace_name(schemaId);
PublicationObjSpec *publicationObject = makeNode(PublicationObjSpec);
publicationObject->pubobjtype = PUBLICATIONOBJ_TABLES_IN_SCHEMA;
publicationObject->pubtable = NULL;
publicationObject->name = schemaName;
publicationObject->location = -1;
createPubStmt->pubobjects = lappend(createPubStmt->pubobjects, publicationObject);
}
#endif
List *relationIds = GetPublicationRelations(publicationId,
publicationForm->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
Oid relationId = InvalidOid;
int citusTableCount PG_USED_FOR_ASSERTS_ONLY = 0;
/* mainly for consistent ordering in test output */
relationIds = SortList(relationIds, CompareOids);
foreach_oid(relationId, relationIds)
{
#if (PG_VERSION_NUM >= PG_VERSION_15)
bool tableOnly = false;
/* since postgres 15, tables can have a column list and filter */
PublicationObjSpec *publicationObject =
BuildPublicationRelationObjSpec(relationId, publicationId, tableOnly);
createPubStmt->pubobjects = lappend(createPubStmt->pubobjects, publicationObject);
#else
/* before postgres 15, only full tables are supported */
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *tableName = get_rel_name(relationId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
createPubStmt->tables = lappend(createPubStmt->tables, rangeVar);
#endif
if (IsCitusTable(relationId))
{
citusTableCount++;
}
}
/* WITH (publish_via_partition_root = true) option */
bool publishViaRoot = publicationForm->pubviaroot;
char *publishViaRootString = publishViaRoot ? "true" : "false";
DefElem *pubViaRootOption = makeDefElem("publish_via_partition_root",
(Node *) makeString(publishViaRootString),
-1);
createPubStmt->options = lappend(createPubStmt->options, pubViaRootOption);
/* WITH (publish = 'insert, update, delete, truncate') option */
List *publishList = NIL;
if (publicationForm->pubinsert)
{
publishList = lappend(publishList, makeString("insert"));
}
if (publicationForm->pubupdate)
{
publishList = lappend(publishList, makeString("update"));
}
if (publicationForm->pubdelete)
{
publishList = lappend(publishList, makeString("delete"));
}
if (publicationForm->pubtruncate)
{
publishList = lappend(publishList, makeString("truncate"));
}
if (list_length(publishList) > 0)
{
StringInfo optionValue = makeStringInfo();
AppendPublishOptionList(optionValue, publishList);
DefElem *publishOption = makeDefElem("publish",
(Node *) makeString(optionValue->data), -1);
createPubStmt->options = lappend(createPubStmt->options, publishOption);
}
return createPubStmt;
}
/*
* AppendPublishOptionList appends a list of publication options in
* comma-separate form.
*/
static void
AppendPublishOptionList(StringInfo str, List *options)
{
ListCell *stringCell = NULL;
foreach(stringCell, options)
{
const char *string = strVal(lfirst(stringCell));
if (stringCell != list_head(options))
{
appendStringInfoString(str, ", ");
}
/* we cannot escape these strings */
appendStringInfoString(str, string);
}
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* BuildPublicationRelationObjSpec returns a PublicationObjSpec that
* can be included in a CREATE or ALTER PUBLICATION statement.
*/
static PublicationObjSpec *
BuildPublicationRelationObjSpec(Oid relationId, Oid publicationId,
bool tableOnly)
{
HeapTuple pubRelationTuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(relationId),
ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(pubRelationTuple))
{
ereport(ERROR, (errmsg("cannot find relation with oid %d in publication "
"with oid %d", relationId, publicationId)));
}
List *columnNameList = NIL;
Node *whereClause = NULL;
/* build the column list */
if (!tableOnly)
{
bool isNull = false;
Datum attributesDatum = SysCacheGetAttr(PUBLICATIONRELMAP, pubRelationTuple,
Anum_pg_publication_rel_prattrs,
&isNull);
if (!isNull)
{
ArrayType *attributesArray = DatumGetArrayTypeP(attributesDatum);
int attributeCount = ARR_DIMS(attributesArray)[0];
int16 *elems = (int16 *) ARR_DATA_PTR(attributesArray);
for (int attNumIndex = 0; attNumIndex < attributeCount; attNumIndex++)
{
AttrNumber attributeNumber = elems[attNumIndex];
char *columnName = get_attname(relationId, attributeNumber, false);
columnNameList = lappend(columnNameList, makeString(columnName));
}
}
/* build the WHERE clause */
Datum whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, pubRelationTuple,
Anum_pg_publication_rel_prqual,
&isNull);
if (!isNull)
{
/*
* We use the already-transformed parse tree form here, which does
* not match regular CreatePublicationStmt
*/
whereClause = stringToNode(TextDatumGetCString(whereClauseDatum));
}
}
ReleaseSysCache(pubRelationTuple);
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *tableName = get_rel_name(relationId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
/* build the FOR TABLE */
PublicationTable *publicationTable =
makeNode(PublicationTable);
publicationTable->relation = rangeVar;
publicationTable->whereClause = whereClause;
publicationTable->columns = columnNameList;
PublicationObjSpec *publicationObject = makeNode(PublicationObjSpec);
publicationObject->pubobjtype = PUBLICATIONOBJ_TABLE;
publicationObject->pubtable = publicationTable;
publicationObject->name = NULL;
publicationObject->location = -1;
return publicationObject;
}
#endif
/*
* PreprocessAlterPublicationStmt handles ALTER PUBLICATION statements
* in a way that is mostly similar to PreprocessAlterDistributedObjectStmt,
* except we do not ensure sequential mode (publications do not interact with
* shards) and can handle NULL deparse commands for ALTER PUBLICATION commands
* that only involve local tables.
*/
List *
PreprocessAlterPublicationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
List *addresses = GetObjectAddressListFromParseTree(stmt, false, false);
/* the code-path only supports a single object */
Assert(list_length(addresses) == 1);
if (!ShouldPropagateAnyObject(addresses))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode(stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
if (sql == NULL)
{
/*
* Deparsing logic decided that there is nothing to propagate, e.g.
* because the command only concerns local tables.
*/
return NIL;
}
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* GetAlterPublicationDDLCommandsForTable gets a list of ALTER PUBLICATION .. ADD/DROP
* commands for the given table.
*
* If isAdd is true, it return ALTER PUBLICATION .. ADD TABLE commands for all
* publications.
*
* Otherwise, it returns ALTER PUBLICATION .. DROP TABLE commands for all
* publications.
*/
List *
GetAlterPublicationDDLCommandsForTable(Oid relationId, bool isAdd)
{
List *commands = NIL;
List *publicationIds = GetRelationPublications(relationId);
Oid publicationId = InvalidOid;
foreach_oid(publicationId, publicationIds)
{
char *command = GetAlterPublicationTableDDLCommand(publicationId,
relationId, isAdd);
commands = lappend(commands, command);
}
return commands;
}
/*
* GetAlterPublicationTableDDLCommand generates an ALTer PUBLICATION .. ADD/DROP TABLE
* command for the given publication and relation ID.
*
* If isAdd is true, it return an ALTER PUBLICATION .. ADD TABLE command.
* Otherwise, it returns ALTER PUBLICATION .. DROP TABLE command.
*/
char *
GetAlterPublicationTableDDLCommand(Oid publicationId, Oid relationId,
bool isAdd)
{
HeapTuple pubTuple = SearchSysCache1(PUBLICATIONOID,
ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(pubTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d",
publicationId)));
}
Form_pg_publication pubForm = (Form_pg_publication) GETSTRUCT(pubTuple);
AlterPublicationStmt *alterPubStmt = makeNode(AlterPublicationStmt);
alterPubStmt->pubname = pstrdup(NameStr(pubForm->pubname));
ReleaseSysCache(pubTuple);
#if (PG_VERSION_NUM >= PG_VERSION_15)
bool tableOnly = !isAdd;
/* since postgres 15, tables can have a column list and filter */
PublicationObjSpec *publicationObject =
BuildPublicationRelationObjSpec(relationId, publicationId, tableOnly);
alterPubStmt->pubobjects = lappend(alterPubStmt->pubobjects, publicationObject);
alterPubStmt->action = isAdd ? AP_AddObjects : AP_DropObjects;
#else
/* before postgres 15, only full tables are supported */
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *tableName = get_rel_name(relationId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
alterPubStmt->tables = lappend(alterPubStmt->tables, rangeVar);
alterPubStmt->tableAction = isAdd ? DEFELEM_ADD : DEFELEM_DROP;
#endif
/* we take the WHERE clause from the catalog where it is already transformed */
bool whereClauseNeedsTransform = false;
/*
* We use these commands to restore publications before/after transforming a
* table, including transformations to/from local tables.
*/
bool includeLocalTables = true;
char *command = DeparseAlterPublicationStmtExtended((Node *) alterPubStmt,
whereClauseNeedsTransform,
includeLocalTables);
return command;
}
/*
* AlterPublicationOwnerCommand returns "ALTER PUBLICATION .. OWNER TO .."
* statement for the specified publication.
*/
static char *
AlterPublicationOwnerCommand(Oid publicationId)
{
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(publicationTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d",
publicationId)));
}
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
char *publicationName = NameStr(publicationForm->pubname);
Oid publicationOwnerId = publicationForm->pubowner;
char *publicationOwnerName = GetUserNameFromId(publicationOwnerId, false);
StringInfo alterCommand = makeStringInfo();
appendStringInfo(alterCommand, "ALTER PUBLICATION %s OWNER TO %s",
quote_identifier(publicationName),
quote_identifier(publicationOwnerName));
ReleaseSysCache(publicationTuple);
return alterCommand->data;
}
/*
* ShouldPropagateCreatePublication tests if we need to propagate a CREATE PUBLICATION
* statement.
*/
static bool
ShouldPropagateCreatePublication(CreatePublicationStmt *stmt)
{
if (!ShouldPropagate())
{
return false;
}
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return false;
}
return true;
}
/*
* AlterPublicationStmtObjectAddress generates the object address for the
* publication altered by a regular ALTER PUBLICATION .. statement.
*/
List *
AlterPublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess)
{
AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node);
return ObjectAddressForPublicationName(stmt->pubname, missingOk);
}
/*
* AlterPublicationOwnerStmtObjectAddress generates the object address for the
* publication altered by the given ALTER PUBLICATION .. OWNER TO statement.
*/
List *
AlterPublicationOwnerStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
return ObjectAddressForPublicationName(strVal(stmt->object), missingOk);
}
/*
* CreatePublicationStmtObjectAddress generates the object address for the
* publication created by the given CREATE PUBLICATION statement.
*/
List *
CreatePublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
return ObjectAddressForPublicationName(stmt->pubname, missingOk);
}
/*
* RenamePublicationStmtObjectAddress generates the object address for the
* publication altered by the given ALter PUBLICATION .. RENAME TO statement.
*/
List *
RenamePublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess)
{
RenameStmt *stmt = castNode(RenameStmt, node);
return ObjectAddressForPublicationName(strVal(stmt->object), missingOk);
}
/*
* ObjectAddressForPublicationName returns the object address for a given publication
* name.
*/
static List *
ObjectAddressForPublicationName(char *publicationName, bool missingOk)
{
Oid publicationId = InvalidOid;
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONNAME, CStringGetDatum(publicationName));
if (HeapTupleIsValid(publicationTuple))
{
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
publicationId = publicationForm->oid;
ReleaseSysCache(publicationTuple);
}
else if (!missingOk)
{
/* it should have just been created */
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("publication \"%s\" does not exist", publicationName)));
}
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, PublicationRelationId, publicationId);
return list_make1(address);
}

View File

@ -1372,7 +1372,7 @@ convert_aclright_to_string(int aclright)
/*
* contain_nextval_expression_walker walks over expression tree and returns
* true if it contains call to 'nextval' function.
* true if it contains call to 'nextval' function or it has an identity column.
*/
bool
contain_nextval_expression_walker(Node *node, void *context)
@ -1382,6 +1382,13 @@ contain_nextval_expression_walker(Node *node, void *context)
return false;
}
/* check if the node contains an identity column */
if (IsA(node, NextValueExpr))
{
return true;
}
/* check if the node contains call to 'nextval' */
if (IsA(node, FuncExpr))
{
FuncExpr *funcExpr = (FuncExpr *) node;

View File

@ -0,0 +1,690 @@
/*-------------------------------------------------------------------------
*
* deparse_publication_stmts.c
* All routines to deparse publication statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/relation.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/namespace_utils.h"
#include "lib/stringinfo.h"
#include "parser/parse_clause.h"
#include "parser/parse_collate.h"
#include "parser/parse_node.h"
#include "parser/parse_relation.h"
#include "nodes/value.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/ruleutils.h"
static void AppendCreatePublicationStmt(StringInfo buf, CreatePublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables);
#if (PG_VERSION_NUM >= PG_VERSION_15)
static bool AppendPublicationObjects(StringInfo buf, List *publicationObjects,
bool whereClauseNeedsTransform,
bool includeLocalTables);
static void AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName,
Node *whereClause,
bool whereClauseNeedsTransform);
static void AppendAlterPublicationAction(StringInfo buf, AlterPublicationAction action);
#else
static bool AppendTables(StringInfo buf, List *tables, bool includeLocalTables);
static void AppendDefElemAction(StringInfo buf, DefElemAction action);
#endif
static bool AppendAlterPublicationStmt(StringInfo buf, AlterPublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables);
static void AppendDropPublicationStmt(StringInfo buf, DropStmt *stmt);
static void AppendRenamePublicationStmt(StringInfo buf, RenameStmt *stmt);
static void AppendAlterPublicationOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendPublicationOptions(StringInfo stringBuffer, List *optionList);
static void AppendIdentifierList(StringInfo buf, List *objects);
/*
* DeparseCreatePublicationStmt builds and returns a string representing a
* CreatePublicationStmt.
*/
char *
DeparseCreatePublicationStmt(Node *node)
{
/* regular deparsing function takes CREATE PUBLICATION from the parser */
bool whereClauseNeedsTransform = false;
/* for regular CREATE PUBLICATION we do not propagate local tables */
bool includeLocalTables = false;
return DeparseCreatePublicationStmtExtended(node, whereClauseNeedsTransform,
includeLocalTables);
}
/*
* DeparseCreatePublicationStmtExtended builds and returns a string representing a
* CreatePublicationStmt, which may have already-transformed expressions.
*/
char *
DeparseCreatePublicationStmtExtended(Node *node, bool whereClauseNeedsTransform,
bool includeLocalTables)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
AppendCreatePublicationStmt(&str, stmt, whereClauseNeedsTransform,
includeLocalTables);
return str.data;
}
/*
* AppendCreatePublicationStmt appends a string representing a
* CreatePublicationStmt to a buffer.
*/
static void
AppendCreatePublicationStmt(StringInfo buf, CreatePublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables)
{
appendStringInfo(buf, "CREATE PUBLICATION %s",
quote_identifier(stmt->pubname));
if (stmt->for_all_tables)
{
appendStringInfoString(buf, " FOR ALL TABLES");
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
else if (stmt->pubobjects != NIL)
{
bool hasObjects = false;
PublicationObjSpec *publicationObject = NULL;
/*
* Check whether there are objects to propagate, mainly to know whether
* we should include "FOR".
*/
foreach_ptr(publicationObject, stmt->pubobjects)
{
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE)
{
/* FOR TABLE ... */
PublicationTable *publicationTable = publicationObject->pubtable;
if (includeLocalTables ||
IsCitusTableRangeVar(publicationTable->relation, NoLock, false))
{
hasObjects = true;
break;
}
}
else
{
hasObjects = true;
break;
}
}
if (hasObjects)
{
appendStringInfoString(buf, " FOR");
AppendPublicationObjects(buf, stmt->pubobjects, whereClauseNeedsTransform,
includeLocalTables);
}
}
#else
else if (stmt->tables != NIL)
{
bool hasTables = false;
RangeVar *rangeVar = NULL;
/*
* Check whether there are tables to propagate, mainly to know whether
* we should include "FOR".
*/
foreach_ptr(rangeVar, stmt->tables)
{
if (includeLocalTables || IsCitusTableRangeVar(rangeVar, NoLock, false))
{
hasTables = true;
break;
}
}
if (hasTables)
{
appendStringInfoString(buf, " FOR");
AppendTables(buf, stmt->tables, includeLocalTables);
}
}
#endif
if (stmt->options != NIL)
{
appendStringInfoString(buf, " WITH (");
AppendPublicationOptions(buf, stmt->options);
appendStringInfoString(buf, ")");
}
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* AppendPublicationObjects appends a string representing a list of publication
* objects to a buffer.
*
* For instance: TABLE users, departments, TABLES IN SCHEMA production
*/
static bool
AppendPublicationObjects(StringInfo buf, List *publicationObjects,
bool whereClauseNeedsTransform,
bool includeLocalTables)
{
PublicationObjSpec *publicationObject = NULL;
bool appendedObject = false;
foreach_ptr(publicationObject, publicationObjects)
{
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE)
{
/* FOR TABLE ... */
PublicationTable *publicationTable = publicationObject->pubtable;
RangeVar *rangeVar = publicationTable->relation;
char *schemaName = rangeVar->schemaname;
char *tableName = rangeVar->relname;
if (!includeLocalTables && !IsCitusTableRangeVar(rangeVar, NoLock, false))
{
/* do not propagate local tables */
continue;
}
if (schemaName != NULL)
{
/* qualified table name */
appendStringInfo(buf, "%s TABLE %s",
appendedObject ? "," : "",
quote_qualified_identifier(schemaName, tableName));
}
else
{
/* unqualified table name */
appendStringInfo(buf, "%s TABLE %s",
appendedObject ? "," : "",
quote_identifier(tableName));
}
if (publicationTable->columns != NIL)
{
appendStringInfoString(buf, " (");
AppendIdentifierList(buf, publicationTable->columns);
appendStringInfoString(buf, ")");
}
if (publicationTable->whereClause != NULL)
{
appendStringInfoString(buf, " WHERE (");
AppendWhereClauseExpression(buf, rangeVar,
publicationTable->whereClause,
whereClauseNeedsTransform);
appendStringInfoString(buf, ")");
}
}
else
{
/* FOR TABLES IN SCHEMA */
char *schemaName = publicationObject->name;
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
{
List *searchPath = fetch_search_path(false);
if (searchPath == NIL)
{
ereport(ERROR, errcode(ERRCODE_UNDEFINED_SCHEMA),
errmsg("no schema has been selected for "
"CURRENT_SCHEMA"));
}
schemaName = get_namespace_name(linitial_oid(searchPath));
}
appendStringInfo(buf, "%s TABLES IN SCHEMA %s",
appendedObject ? "," : "",
quote_identifier(schemaName));
}
appendedObject = true;
}
return appendedObject;
}
/*
* AppendWhereClauseExpression appends a deparsed expression that can
* contain a filter on the given table. If whereClauseNeedsTransform is set
* the expression is first tranformed.
*/
static void
AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName,
Node *whereClause, bool whereClauseNeedsTransform)
{
Relation relation = relation_openrv(tableName, AccessShareLock);
if (whereClauseNeedsTransform)
{
ParseState *pstate = make_parsestate(NULL);
pstate->p_sourcetext = "";
ParseNamespaceItem *nsitem = addRangeTableEntryForRelation(pstate,
relation,
AccessShareLock, NULL,
false, false);
addNSItemToQuery(pstate, nsitem, false, true, true);
whereClause = transformWhereClause(pstate,
copyObject(whereClause),
EXPR_KIND_WHERE,
"PUBLICATION WHERE");
assign_expr_collations(pstate, whereClause);
}
List *relationContext = deparse_context_for(tableName->relname, relation->rd_id);
PushOverrideEmptySearchPath(CurrentMemoryContext);
char *whereClauseString = deparse_expression(whereClause,
relationContext,
true, true);
PopOverrideSearchPath();
appendStringInfoString(buf, whereClauseString);
relation_close(relation, AccessShareLock);
}
#else
/*
* AppendPublicationObjects appends a string representing a list of publication
* objects to a buffer.
*
* For instance: TABLE users, departments
*/
static bool
AppendTables(StringInfo buf, List *tables, bool includeLocalTables)
{
RangeVar *rangeVar = NULL;
bool appendedObject = false;
foreach_ptr(rangeVar, tables)
{
if (!includeLocalTables &&
!IsCitusTableRangeVar(rangeVar, NoLock, false))
{
/* do not propagate local tables */
continue;
}
char *schemaName = rangeVar->schemaname;
char *tableName = rangeVar->relname;
if (schemaName != NULL)
{
/* qualified table name */
appendStringInfo(buf, "%s %s",
appendedObject ? "," : " TABLE",
quote_qualified_identifier(schemaName, tableName));
}
else
{
/* unqualified table name */
appendStringInfo(buf, "%s %s",
appendedObject ? "," : " TABLE",
quote_identifier(tableName));
}
appendedObject = true;
}
return appendedObject;
}
#endif
/*
* DeparseAlterPublicationSchemaStmt builds and returns a string representing
* an AlterPublicationStmt.
*/
char *
DeparseAlterPublicationStmt(Node *node)
{
/* regular deparsing function takes ALTER PUBLICATION from the parser */
bool whereClauseNeedsTransform = true;
/* for regular ALTER PUBLICATION we do not propagate local tables */
bool includeLocalTables = false;
return DeparseAlterPublicationStmtExtended(node, whereClauseNeedsTransform,
includeLocalTables);
}
/*
* DeparseAlterPublicationStmtExtended builds and returns a string representing a
* AlterPublicationStmt, which may have already-transformed expressions.
*/
char *
DeparseAlterPublicationStmtExtended(Node *node, bool whereClauseNeedsTransform,
bool includeLocalTables)
{
AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
if (!AppendAlterPublicationStmt(&str, stmt, whereClauseNeedsTransform,
includeLocalTables))
{
Assert(!includeLocalTables);
/*
* When there are no objects to propagate, then there is no
* valid ALTER PUBLICATION to construct.
*/
return NULL;
}
return str.data;
}
/*
* AppendAlterPublicationStmt appends a string representing an AlterPublicationStmt
* of the form ALTER PUBLICATION .. ADD/SET/DROP
*/
static bool
AppendAlterPublicationStmt(StringInfo buf, AlterPublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables)
{
appendStringInfo(buf, "ALTER PUBLICATION %s",
quote_identifier(stmt->pubname));
if (stmt->options)
{
appendStringInfoString(buf, " SET (");
AppendPublicationOptions(buf, stmt->options);
appendStringInfoString(buf, ")");
/* changing options cannot be combined with other actions */
return true;
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
AppendAlterPublicationAction(buf, stmt->action);
return AppendPublicationObjects(buf, stmt->pubobjects, whereClauseNeedsTransform,
includeLocalTables);
#else
AppendDefElemAction(buf, stmt->tableAction);
return AppendTables(buf, stmt->tables, includeLocalTables);
#endif
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* AppendAlterPublicationAction appends a string representing an AlterPublicationAction
* to a buffer.
*/
static void
AppendAlterPublicationAction(StringInfo buf, AlterPublicationAction action)
{
switch (action)
{
case AP_AddObjects:
{
appendStringInfoString(buf, " ADD");
break;
}
case AP_DropObjects:
{
appendStringInfoString(buf, " DROP");
break;
}
case AP_SetObjects:
{
appendStringInfoString(buf, " SET");
break;
}
default:
{
ereport(ERROR, (errmsg("unrecognized publication action: %d", action)));
}
}
}
#else
/*
* AppendDefElemAction appends a string representing a DefElemAction
* to a buffer.
*/
static void
AppendDefElemAction(StringInfo buf, DefElemAction action)
{
switch (action)
{
case DEFELEM_ADD:
{
appendStringInfoString(buf, " ADD");
break;
}
case DEFELEM_DROP:
{
appendStringInfoString(buf, " DROP");
break;
}
case DEFELEM_SET:
{
appendStringInfoString(buf, " SET");
break;
}
default:
{
ereport(ERROR, (errmsg("unrecognized publication action: %d", action)));
}
}
}
#endif
/*
* DeparseDropPublicationStmt builds and returns a string representing the DropStmt
*/
char *
DeparseDropPublicationStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->removeType == OBJECT_PUBLICATION);
AppendDropPublicationStmt(&str, stmt);
return str.data;
}
/*
* AppendDropPublicationStmt appends a string representing the DropStmt to a buffer
*/
static void
AppendDropPublicationStmt(StringInfo buf, DropStmt *stmt)
{
appendStringInfoString(buf, "DROP PUBLICATION ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
AppendIdentifierList(buf, stmt->objects);
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
}
/*
* DeparseRenamePublicationStmt builds and returns a string representing the RenameStmt
*/
char *
DeparseRenamePublicationStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->renameType == OBJECT_PUBLICATION);
AppendRenamePublicationStmt(&str, stmt);
return str.data;
}
/*
* AppendRenamePublicationStmt appends a string representing the RenameStmt to a buffer
*/
static void
AppendRenamePublicationStmt(StringInfo buf, RenameStmt *stmt)
{
appendStringInfo(buf, "ALTER PUBLICATION %s RENAME TO %s;",
quote_identifier(strVal(stmt->object)),
quote_identifier(stmt->newname));
}
/*
* DeparseAlterPublicationOwnerStmt builds and returns a string representing the AlterOwnerStmt
*/
char *
DeparseAlterPublicationOwnerStmt(Node *node)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_PUBLICATION);
AppendAlterPublicationOwnerStmt(&str, stmt);
return str.data;
}
/*
* AppendAlterPublicationOwnerStmt appends a string representing the AlterOwnerStmt to a buffer
*/
static void
AppendAlterPublicationOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
{
Assert(stmt->objectType == OBJECT_PUBLICATION);
appendStringInfo(buf, "ALTER PUBLICATION %s OWNER TO %s;",
quote_identifier(strVal(stmt->object)),
RoleSpecString(stmt->newowner, true));
}
/*
* AppendPublicationOptions appends a string representing a list of publication opions.
*/
static void
AppendPublicationOptions(StringInfo stringBuffer, List *optionList)
{
ListCell *optionCell = NULL;
bool firstOptionPrinted = false;
foreach(optionCell, optionList)
{
DefElem *option = (DefElem *) lfirst(optionCell);
char *optionName = option->defname;
char *optionValue = defGetString(option);
NodeTag valueType = nodeTag(option->arg);
if (firstOptionPrinted)
{
appendStringInfo(stringBuffer, ", ");
}
firstOptionPrinted = true;
appendStringInfo(stringBuffer, "%s = ",
quote_identifier(optionName));
#if (PG_VERSION_NUM >= PG_VERSION_15)
if (valueType == T_Integer || valueType == T_Float || valueType == T_Boolean)
#else
if (valueType == T_Integer || valueType == T_Float)
#endif
{
/* string escaping is unnecessary for numeric types and can cause issues */
appendStringInfo(stringBuffer, "%s", optionValue);
}
else
{
appendStringInfo(stringBuffer, "%s", quote_literal_cstr(optionValue));
}
}
}
/*
* AppendIdentifierList appends a string representing a list of
* identifiers (of String type).
*/
static void
AppendIdentifierList(StringInfo buf, List *objects)
{
ListCell *objectCell = NULL;
foreach(objectCell, objects)
{
char *name = strVal(lfirst(objectCell));
if (objectCell != list_head(objects))
{
appendStringInfo(buf, ", ");
}
appendStringInfoString(buf, quote_identifier(name));
}
}

View File

@ -0,0 +1,119 @@
/*-------------------------------------------------------------------------
*
* qualify_publication_stmt.c
* Functions specialized in fully qualifying all publication statements. These
* functions are dispatched from qualify.c
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "nodes/nodes.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#if (PG_VERSION_NUM >= PG_VERSION_15)
static void QualifyPublicationObjects(List *publicationObjects);
#else
static void QualifyTables(List *tables);
#endif
static void QualifyPublicationRangeVar(RangeVar *publication);
/*
* QualifyCreatePublicationStmt quailifies the publication names of the
* CREATE PUBLICATION statement.
*/
void
QualifyCreatePublicationStmt(Node *node)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
#if (PG_VERSION_NUM >= PG_VERSION_15)
QualifyPublicationObjects(stmt->pubobjects);
#else
QualifyTables(stmt->tables);
#endif
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* QualifyPublicationObjects ensures all table names in a list of
* publication objects are fully qualified.
*/
static void
QualifyPublicationObjects(List *publicationObjects)
{
PublicationObjSpec *publicationObject = NULL;
foreach_ptr(publicationObject, publicationObjects)
{
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE)
{
/* FOR TABLE ... */
PublicationTable *publicationTable = publicationObject->pubtable;
QualifyPublicationRangeVar(publicationTable->relation);
}
}
}
#else
/*
* QualifyTables ensures all table names in a list are fully qualified.
*/
static void
QualifyTables(List *tables)
{
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, tables)
{
QualifyPublicationRangeVar(rangeVar);
}
}
#endif
/*
* QualifyPublicationObjects ensures all table names in a list of
* publication objects are fully qualified.
*/
void
QualifyAlterPublicationStmt(Node *node)
{
AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node);
#if (PG_VERSION_NUM >= PG_VERSION_15)
QualifyPublicationObjects(stmt->pubobjects);
#else
QualifyTables(stmt->tables);
#endif
}
/*
* QualifyPublicationRangeVar qualifies the given publication RangeVar if it is not qualified.
*/
static void
QualifyPublicationRangeVar(RangeVar *publication)
{
if (publication->schemaname == NULL)
{
Oid publicationOid = RelnameGetRelid(publication->relname);
Oid schemaOid = get_rel_namespace(publicationOid);
publication->schemaname = get_namespace_name(schemaOid);
}
}

View File

@ -802,6 +802,11 @@ GetObjectTypeString(ObjectType objType)
return "function";
}
case OBJECT_PUBLICATION:
{
return "publication";
}
case OBJECT_SCHEMA:
{
return "schema";

View File

@ -132,6 +132,7 @@ typedef struct ViewDependencyNode
static List * GetRelationSequenceDependencyList(Oid relationId);
static List * GetRelationFunctionDependencyList(Oid relationId);
static List * GetRelationTriggerFunctionDependencyList(Oid relationId);
static List * GetPublicationRelationsDependencyList(Oid relationId);
static List * GetRelationStatsSchemaDependencyList(Oid relationId);
static List * GetRelationIndicesDependencyList(Oid relationId);
static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId);
@ -722,6 +723,11 @@ SupportedDependencyByCitus(const ObjectAddress *address)
return true;
}
case OCLASS_PUBLICATION:
{
return true;
}
case OCLASS_TSCONFIG:
{
return true;
@ -1656,6 +1662,36 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
List *ruleRefDepList = GetViewRuleReferenceDependencyList(relationId);
result = list_concat(result, ruleRefDepList);
}
break;
}
case PublicationRelationId:
{
Oid publicationId = target.objectId;
/*
* Publications do not depend directly on relations, because dropping
* the relation will only remove it from the publications. However,
* we add a dependency to ensure the relation is created first when
* adding a node.
*/
List *relationDependencyList =
GetPublicationRelationsDependencyList(publicationId);
result = list_concat(result, relationDependencyList);
/*
* As of PostgreSQL 15, the same applies to schemas.
*/
#if PG_VERSION_NUM >= PG_VERSION_15
List *schemaIdList =
GetPublicationSchemas(publicationId);
List *schemaDependencyList =
CreateObjectAddressDependencyDefList(NamespaceRelationId, schemaIdList);
result = list_concat(result, schemaDependencyList);
#endif
break;
}
default:
@ -1923,6 +1959,33 @@ GetRelationTriggerFunctionDependencyList(Oid relationId)
}
/*
* GetPublicationRelationsDependencyList creates a list of ObjectAddressDependencies for
* a publication on the Citus relations it contains. This helps make sure we distribute
* Citus tables before local tables.
*/
static List *
GetPublicationRelationsDependencyList(Oid publicationId)
{
List *allRelationIds = GetPublicationRelations(publicationId, PUBLICATION_PART_ROOT);
List *citusRelationIds = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, allRelationIds)
{
if (!IsCitusTable(relationId))
{
continue;
}
citusRelationIds = lappend_oid(citusRelationIds, relationId);
}
return CreateObjectAddressDependencyDefList(RelationRelationId, citusRelationIds);
}
/*
* GetTypeConstraintDependencyDefinition creates a list of constraint dependency
* definitions for a given type

View File

@ -590,6 +590,18 @@ IsCitusTable(Oid relationId)
}
/*
* IsCitusTableRangeVar returns whether the table named in the given
* rangeVar is a Citus table.
*/
bool
IsCitusTableRangeVar(RangeVar *rangeVar, LOCKMODE lockMode, bool missingOK)
{
Oid relationId = RangeVarGetRelid(rangeVar, lockMode, missingOK);
return IsCitusTable(relationId);
}
/*
* IsCitusTableViaCatalog returns whether the given relation is a
* distributed table or not.

View File

@ -100,6 +100,7 @@ static bool HasMetadataWorkers(void);
static void CreateShellTableOnWorkers(Oid relationId);
static void CreateTableMetadataOnWorkers(Oid relationId);
static void CreateDependingViewsOnWorkers(Oid relationId);
static void AddTableToPublications(Oid relationId);
static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
bool citusTableWithNoDistKey);
@ -302,7 +303,8 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
* Our definition of metadata includes the shell table and its inter relations with
* other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard
* and pg_dist_shard placement entries. This function also propagates the views that
* depend on the given relation, to the metadata workers.
* depend on the given relation, to the metadata workers, and adds the relation to
* the appropriate publications.
*/
void
SyncCitusTableMetadata(Oid relationId)
@ -319,6 +321,7 @@ SyncCitusTableMetadata(Oid relationId)
}
CreateDependingViewsOnWorkers(relationId);
AddTableToPublications(relationId);
}
@ -364,6 +367,49 @@ CreateDependingViewsOnWorkers(Oid relationId)
}
/*
* AddTableToPublications adds the table to a publication on workers with metadata.
*/
static void
AddTableToPublications(Oid relationId)
{
List *publicationIds = GetRelationPublications(relationId);
if (publicationIds == NIL)
{
return;
}
Oid publicationId = InvalidOid;
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
foreach_oid(publicationId, publicationIds)
{
ObjectAddress *publicationAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*publicationAddress, PublicationRelationId, publicationId);
List *addresses = list_make1(publicationAddress);
if (!ShouldPropagateAnyObject(addresses))
{
/* skip non-distributed publications */
continue;
}
/* ensure schemas exist */
EnsureAllObjectDependenciesExistOnAllNodes(addresses);
bool isAdd = true;
char *alterPublicationCommand =
GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd);
/* send ALTER PUBLICATION .. ADD to workers with metadata */
SendCommandToWorkersWithMetadata(alterPublicationCommand);
}
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
}
/*
* EnsureSequentialModeMetadataOperations makes sure that the current transaction is
* already in sequential mode, or can still safely be put in sequential mode,

View File

@ -425,6 +425,7 @@ ErrorIfCurrentUserCanNotDistributeObject(char *textType, ObjectType type,
case OBJECT_COLLATION:
case OBJECT_VIEW:
case OBJECT_ROLE:
case OBJECT_PUBLICATION:
{
check_object_ownership(userId, type, *addr, node, *relation);
break;

View File

@ -190,6 +190,19 @@ typedef struct WorkerShardStatistics
HTAB *statistics;
} WorkerShardStatistics;
/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */
typedef struct ShardMoveDependencyInfo
{
int64 key;
int64 taskId;
} ShardMoveDependencyInfo;
typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
HTAB *nodeDependencies;
} ShardMoveDependencies;
char *VariablesToBePassedToNewConnections = NULL;
/* static declarations for main logic */
@ -1898,6 +1911,137 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options)
}
/*
* GetColocationId function returns the colocationId of the shard in a PlacementUpdateEvent.
*/
static int64
GetColocationId(PlacementUpdateEvent *move)
{
ShardInterval *shardInterval = LoadShardInterval(move->shardId);
CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(
shardInterval->relationId);
return citusTableCacheEntry->colocationId;
}
/*
* InitializeShardMoveDependencies function creates the hash maps that we use to track
* the latest moves so that subsequent moves with the same properties must take a dependency
* on them. There are two hash maps. One is for tracking the latest move scheduled in a
* given colocation group and the other one is for tracking the latest move which involves
* a given node either as its source node or its target node.
*/
static ShardMoveDependencies
InitializeShardMoveDependencies()
{
ShardMoveDependencies shardMoveDependencies;
shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"nodeDependencyHashMap",
6);
return shardMoveDependencies;
}
/*
* GenerateTaskMoveDependencyList creates and returns a List of taskIds that
* the move must take a dependency on.
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
ShardMoveDependencies shardMoveDependencies, int *nDepends)
{
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
"shardMoveDependencyList", 0);
bool found;
/* Check if there exists a move in the same colocation group scheduled earlier. */
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's source node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's target node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
*nDepends = hash_get_num_entries(dependsList);
int64 *dependsArray = NULL;
if (*nDepends > 0)
{
HASH_SEQ_STATUS seq;
dependsArray = palloc((*nDepends) * sizeof(int64));
hash_seq_init(&seq, dependsList);
int i = 0;
int64 *dependsTaskId;
while ((dependsTaskId = (int64 *) hash_seq_search(&seq)) != NULL)
{
dependsArray[i++] = *dependsTaskId;
}
}
return dependsArray;
}
/*
* UpdateShardMoveDependencies function updates the dependency maps with the latest move's taskId.
*/
static void
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
ShardMoveDependencies shardMoveDependencies)
{
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->sourceNode->nodeId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->targetNode->nodeId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
}
/*
* RebalanceTableShardsBackground rebalances the shards for the relations
* inside the relationIdList across the different workers. It does so using our
@ -1974,18 +2118,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
StringInfoData buf = { 0 };
initStringInfo(&buf);
/*
* Currently we only have two tasks that any move can depend on:
* - replicating reference tables
* - the previous move
*
* prevJobIdx tells what slot to write the id of the task into. We only use both slots
* if we are actually replicating reference tables.
*/
int64 prevJobId[2] = { 0 };
int prevJobIdx = 0;
List *referenceTableIdList = NIL;
int64 replicateRefTablesTaskId = 0;
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
@ -2001,15 +2135,15 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevJobIdx] = task->taskid;
prevJobIdx++;
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0,
NULL);
replicateRefTablesTaskId = task->taskid;
}
PlacementUpdateEvent *move = NULL;
bool first = true;
int prevMoveIndex = prevJobIdx;
ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies();
foreach_ptr(move, placementUpdateList)
{
resetStringInfo(&buf);
@ -2021,14 +2155,27 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
move->targetNode->nodeId,
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevMoveIndex] = task->taskid;
if (first)
int64 colocationId = GetColocationId(move);
int nDepends = 0;
int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId,
shardMoveDependencies,
&nDepends);
if (nDepends == 0 && replicateRefTablesTaskId > 0)
{
first = false;
prevJobIdx++;
nDepends = 1;
dependsArray = palloc(nDepends * sizeof(int64));
dependsArray[0] = replicateRefTablesTaskId;
}
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
nDepends,
dependsArray);
UpdateShardMoveDependencies(move, colocationId, task->taskid,
shardMoveDependencies);
}
ereport(NOTICE,

View File

@ -88,6 +88,8 @@ static const char *replicationSlotPrefix[] = {
* IMPORTANT: All the subscription names should start with "citus_". Otherwise
* our utility hook does not defend against non-superusers altering or dropping
* them, which is important for security purposes.
*
* We should also keep these in sync with IsCitusShardTransferBackend().
*/
static const char *subscriptionPrefix[] = {
[SHARD_MOVE] = "citus_shard_move_subscription_",
@ -1338,7 +1340,9 @@ CreatePublications(MultiConnection *connection,
worker->groupId,
CLEANUP_ALWAYS);
ExecuteCriticalRemoteCommand(connection, DISABLE_DDL_PROPAGATION);
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
ExecuteCriticalRemoteCommand(connection, ENABLE_DDL_PROPAGATION);
pfree(createPublicationCommand->data);
pfree(createPublicationCommand);
}

View File

@ -1,3 +1,9 @@
-- citus--11.2-1--11.3-1
#include "udfs/repl_origin_helper/11.3-1.sql"
#include "udfs/worker_adjust_identity_column_seq_ranges/11.3-1.sql"
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY USING INDEX pg_dist_rebalance_strategy_name_key;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_shardid_index;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint;

View File

@ -1,5 +1,19 @@
-- citus--11.3-1--11.2-1
DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active();
DROP FUNCTION IF EXISTS pg_catalog.worker_adjust_identity_column_seq_ranges(regclass);
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;

View File

@ -1452,6 +1452,21 @@ IsExternalClientBackend(void)
}
/*
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
* rebalancer initiated.
*/
bool
IsCitusShardTransferBackend(void)
{
int prefixLength = strlen(CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX);
return strncmp(application_name,
CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX,
prefixLength) == 0;
}
/*
* DetermineCitusBackendType determines the type of backend based on the application_name.
*/

View File

@ -35,8 +35,22 @@
#include "distributed/worker_create_or_replace.h"
#include "distributed/worker_protocol.h"
/*
* OnCollisionAction describes what to do when the created object
* and existing object do not match.
*/
typedef enum OnCollisionAction
{
ON_COLLISION_RENAME,
ON_COLLISION_DROP
} OnCollisionAction;
static List * CreateStmtListByObjectAddress(const ObjectAddress *address);
static bool CompareStringList(List *list1, List *list2);
static OnCollisionAction GetOnCollisionAction(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(worker_create_or_replace_object);
PG_FUNCTION_INFO_V1(worker_create_or_replace_object_array);
@ -192,7 +206,8 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
/*
* Object with name from statement is already found locally, check if states are
* identical. If objects differ we will rename the old object (non- destructively)
* as to make room to create the new object according to the spec sent.
* or drop it (if safe) as to make room to create the new object according to the
* spec sent.
*/
/*
@ -213,11 +228,22 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
return false;
}
char *newName = GenerateBackupNameForCollision(address);
Node *utilityStmt = NULL;
RenameStmt *renameStmt = CreateRenameStatement(address, newName);
const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt,
if (GetOnCollisionAction(address) == ON_COLLISION_DROP)
{
/* drop the existing object */
utilityStmt = (Node *) CreateDropStmt(address);
}
else
{
/* rename the existing object */
char *newName = GenerateBackupNameForCollision(address);
utilityStmt = (Node *) CreateRenameStatement(address, newName);
}
const char *commandString = DeparseTreeNode(utilityStmt);
ProcessUtilityParseTree(utilityStmt, commandString,
PROCESS_UTILITY_QUERY,
NULL, None_Receiver, NULL);
}
@ -286,6 +312,11 @@ CreateStmtListByObjectAddress(const ObjectAddress *address)
return list_make1(GetFunctionDDLCommand(address->objectId, false));
}
case OCLASS_PUBLICATION:
{
return list_make1(CreatePublicationDDLCommand(address->objectId));
}
case OCLASS_TSCONFIG:
{
List *stmts = GetCreateTextSearchConfigStatements(address);
@ -312,6 +343,37 @@ CreateStmtListByObjectAddress(const ObjectAddress *address)
}
/*
* GetOnCollisionAction decides what to do when the object already exists.
*/
static OnCollisionAction
GetOnCollisionAction(const ObjectAddress *address)
{
switch (getObjectClass(address))
{
case OCLASS_PUBLICATION:
{
/*
* We prefer to drop publications because they can be
* harmful (cause update/delete failures) and are relatively
* safe to drop.
*/
return ON_COLLISION_DROP;
}
case OCLASS_COLLATION:
case OCLASS_PROC:
case OCLASS_TSCONFIG:
case OCLASS_TSDICT:
case OCLASS_TYPE:
default:
{
return ON_COLLISION_RENAME;
}
}
}
/*
* GenerateBackupNameForCollision calculate a backup name for a given object by its
* address. This name should be used when renaming an existing object before creating the
@ -362,6 +424,64 @@ GenerateBackupNameForCollision(const ObjectAddress *address)
}
/*
* CreateDropPublicationStmt creates a DROP PUBLICATION statement for the
* publication at the given address.
*/
static DropStmt *
CreateDropPublicationStmt(const ObjectAddress *address)
{
Assert(address->classId == PublicationRelationId);
DropStmt *dropStmt = makeNode(DropStmt);
dropStmt->removeType = OBJECT_PUBLICATION;
dropStmt->behavior = DROP_RESTRICT;
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(address->objectId));
if (!HeapTupleIsValid(publicationTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d",
address->objectId)));
}
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
char *publicationName = NameStr(publicationForm->pubname);
dropStmt->objects = list_make1(makeString(publicationName));
ReleaseSysCache(publicationTuple);
return dropStmt;
}
/*
* CreateDropStmt returns a DROP statement for the given object.
*/
DropStmt *
CreateDropStmt(const ObjectAddress *address)
{
switch (getObjectClass(address))
{
case OCLASS_PUBLICATION:
{
return CreateDropPublicationStmt(address);
}
default:
{
break;
}
}
ereport(ERROR, (errmsg("unsupported object to construct a drop statement"),
errdetail("unable to generate a parsetree for the drop")));
}
/*
* CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress.
* The rename statement will rename the existing object on its address to the value

View File

@ -351,18 +351,17 @@ ShouldHideShardsInternal(void)
return false;
}
}
else if (MyBackendType != B_BACKEND)
else if (MyBackendType != B_BACKEND && MyBackendType != B_WAL_SENDER)
{
/*
* We are aiming only to hide shards from client
* backends or certain background workers(see above),
* not backends like walsender or checkpointer.
*/
return false;
}
if (IsCitusInternalBackend() || IsRebalancerInternalBackend() ||
IsCitusRunCommandBackend())
IsCitusRunCommandBackend() || IsCitusShardTransferBackend())
{
/* we never hide shards from Citus */
return false;

View File

@ -77,6 +77,7 @@ extern bool IsCitusInternalBackend(void);
extern bool IsRebalancerInternalBackend(void);
extern bool IsCitusRunCommandBackend(void);
extern bool IsExternalClientBackend(void);
extern bool IsCitusShardTransferBackend(void);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999

View File

@ -409,6 +409,24 @@ extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, u
extern void AddRangeTableEntryToQueryCompat(ParseState *parseState, Relation relation);
/* publication.c - forward declarations */
extern List * PostProcessCreatePublicationStmt(Node *node, const char *queryString);
extern List * CreatePublicationDDLCommandsIdempotent(const ObjectAddress *address);
extern char * CreatePublicationDDLCommand(Oid publicationId);
extern List * PreprocessAlterPublicationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityCtx);
extern List * GetAlterPublicationDDLCommandsForTable(Oid relationId, bool isAdd);
extern char * GetAlterPublicationTableDDLCommand(Oid publicationId, Oid relationId,
bool isAdd);
extern List * AlterPublicationOwnerStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
extern List * AlterPublicationStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
extern List * CreatePublicationStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
extern List * RenamePublicationStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
/* rename.c - forward declarations*/
extern List * PreprocessRenameStmt(Node *renameStmt, const char *renameCommand,
ProcessUtilityContext processUtilityContext);
@ -660,7 +678,6 @@ extern List * PreprocessDropViewStmt(Node *node, const char *queryString,
extern List * DropViewStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
extern char * CreateViewDDLCommand(Oid viewOid);
extern List * GetViewCreationCommandsOfTable(Oid relationId);
extern List * GetViewCreationTableDDLCommandsOfTable(Oid relationId);
extern char * AlterViewOwnerCommand(Oid viewOid);
extern char * DeparseViewStmt(Node *node);
extern char * DeparseDropViewStmt(Node *node);

View File

@ -42,6 +42,14 @@
/* application name used for connections made by run_command_on_* */
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
/*
* application name prefix for move/split replication connections.
*
* This application_name is set to the subscription name by logical replication
* workers, so there is no GPID.
*/
#define CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX "citus_shard_"
/* deal with waiteventset errors */
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
#define WAIT_EVENT_SET_INDEX_FAILED -2

View File

@ -210,6 +210,23 @@ extern char * DeparseAlterExtensionStmt(Node *stmt);
/* forward declarations for deparse_database_stmts.c */
extern char * DeparseAlterDatabaseOwnerStmt(Node *node);
/* forward declaration for deparse_publication_stmts.c */
extern char * DeparseCreatePublicationStmt(Node *stmt);
extern char * DeparseCreatePublicationStmtExtended(Node *node,
bool whereClauseNeedsTransform,
bool includeLocalTables);
extern char * DeparseAlterPublicationStmt(Node *stmt);
extern char * DeparseAlterPublicationStmtExtended(Node *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables);
extern char * DeparseAlterPublicationOwnerStmt(Node *stmt);
extern char * DeparseAlterPublicationSchemaStmt(Node *node);
extern char * DeparseDropPublicationStmt(Node *stmt);
extern char * DeparseRenamePublicationStmt(Node *node);
extern void QualifyCreatePublicationStmt(Node *node);
extern void QualifyAlterPublicationStmt(Node *node);
/* forward declatations for deparse_text_search_stmts.c */
extern void QualifyAlterTextSearchConfigurationOwnerStmt(Node *node);
extern void QualifyAlterTextSearchConfigurationSchemaStmt(Node *node);

View File

@ -147,6 +147,7 @@ extern char * GetTableTypeName(Oid tableId);
extern void SetCreateCitusTransactionLevel(int val);
extern int GetCitusCreationLevel(void);
extern bool IsCitusTable(Oid relationId);
extern bool IsCitusTableRangeVar(RangeVar *rangeVar, LOCKMODE lockMode, bool missingOk);
extern bool IsCitusTableViaCatalog(Oid relationId);
extern char PgDistPartitionViaCatalog(Oid relationId);
extern List * LookupDistShardTuples(Oid relationId);

View File

@ -21,6 +21,7 @@
extern char * WrapCreateOrReplace(const char *sql);
extern char * WrapCreateOrReplaceList(List *sqls);
extern char * GenerateBackupNameForCollision(const ObjectAddress *address);
extern DropStmt * CreateDropStmt(const ObjectAddress *address);
extern RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName);
#endif /* WORKER_CREATE_OR_REPLACE_H */

View File

@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -41,12 +41,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,7 +46,7 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
#insert data into the sensors table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
@ -56,7 +56,6 @@ FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -26,7 +26,7 @@ wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
# Create the reference table in the coordinator and cdc client nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_reference_table('reference_table');");
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -56,7 +56,7 @@ FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
create_cdc_publication_and_slots_for_workers(\@workers,'sensors');
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -43,12 +43,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -48,12 +48,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);

View File

@ -195,7 +195,7 @@ sub create_cdc_publication_and_replication_slots_for_citus_cluster {
my $table_names = $_[2];
create_cdc_publication_and_slots_for_coordinator($node_coordinator, $table_names);
create_cdc_publication_and_slots_for_workers($workersref, $table_names);
create_cdc_slots_for_workers($workersref);
}
sub create_cdc_publication_and_slots_for_coordinator {
@ -210,31 +210,7 @@ sub create_cdc_publication_and_slots_for_coordinator {
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)");
}
sub create_cdc_publication_and_slots_for_workers {
my $workersref = $_[0];
my $table_names = $_[1];
create_cdc_publication_for_workers($workersref, $table_names);
create_cdc_replication_slots_for_workers($workersref);
}
sub create_cdc_publication_for_workers {
my $workersref = $_[0];
my $table_names = $_[1];
for (@$workersref) {
my $pub = $_->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';");
if ($pub ne "") {
$_->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;");
}
if ($table_names eq "all") {
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR ALL TABLES;");
} else {
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;");
}
}
}
sub create_cdc_replication_slots_for_workers {
sub create_cdc_slots_for_workers {
my $workersref = $_[0];
for (@$workersref) {
my $slot = $_->safe_psql('postgres',"select * from pg_replication_slots where slot_name = 'cdc_replication_slot';");

View File

@ -223,7 +223,7 @@ check-follower-cluster: all
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS)
check-operations: all
$(pg_regress_multi_check) --load-extension=citus \
$(pg_regress_multi_check) --load-extension=citus --worker-count=6 \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/operations_schedule $(EXTRA_TESTS)
check-columnar: all

View File

@ -110,6 +110,14 @@ if __name__ == "__main__":
"multi_mx_function_table_reference",
],
),
"background_rebalance_parallel": TestDeps(
None,
[
"multi_test_helpers",
"multi_cluster_management",
],
worker_count=6,
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),

View File

@ -19,6 +19,7 @@ test: citus_local_tables_ent
test: remove_coordinator
# --------
test: publication
test: logical_replication
test: multi_create_table
test: multi_create_table_superuser

View File

@ -291,6 +291,12 @@ SELECT state, details from citus_rebalance_status();
finished | {"tasks": [], "task_state_counts": {"done": 2}}
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- Remove coordinator again to allow rerunning of this test
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?

View File

@ -0,0 +1,364 @@
/*
Test to check if the background tasks scheduled by the background rebalancer
has the correct dependencies.
*/
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Add two new node so that we can rebalance */
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640
table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639
table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640
table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639
table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640
table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639
table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640
table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639
table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640
table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639
table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640
table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639
(12 rows)
SELECT * FROM citus_rebalance_start();
citus_rebalance_start
---------------------------------------------------------------------
17777
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
/*Check that a move is dependent on
1. any other move scheduled earlier in its colocation group.
2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
shardid | colocationid
---------------------------------------------------------------------
85674000 | 50050
85674001 | 50050
85674002 | 50050
85674003 | 50050
85674004 | 50050
85674005 | 50050
85674006 | 50050
85674007 | 50050
85674008 | 50051
85674009 | 50051
85674010 | 50051
85674011 | 50051
85674012 | 50051
85674013 | 50051
85674014 | 50051
85674015 | 50051
85674016 | 50052
85674017 | 50052
85674018 | 50052
85674019 | 50052
85674020 | 50052
85674021 | 50052
85674022 | 50052
85674023 | 50052
(24 rows)
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto')
(7 rows)
/* Check that if there is a reference table that needs to be synched to a node,
any move without a dependency must depend on the move task for reference table. */
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true);
?column?
---------------------------------------------------------------------
1
(1 row)
/* Drain worker_3 so that we can move only one colocation group to worker_3
to create an unbalance that would cause parallel rebalancing. */
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
CREATE TABLE ref_table(a int PRIMARY KEY);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
/* Move all the shards of Colocation group 3 to worker_3.*/
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
master_move_shard_placement
---------------------------------------------------------------------
(4 rows)
CALL citus_cleanup_orphaned_resources();
/* Activate and new nodes so that we can rebalance. */
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM citus_rebalance_start();
citus_rebalance_start
---------------------------------------------------------------------
17778
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
shardid | colocationid
---------------------------------------------------------------------
85674000 | 50050
85674001 | 50050
85674002 | 50050
85674003 | 50050
85674004 | 50050
85674005 | 50050
85674006 | 50050
85674007 | 50050
85674008 | 50051
85674009 | 50051
85674010 | 50051
85674011 | 50051
85674012 | 50051
85674013 | 50051
85674014 | 50051
85674015 | 50051
85674016 | 50052
85674017 | 50052
85674018 | 50052
85674019 | 50052
85674020 | 50052
85674021 | 50052
85674022 | 50052
85674023 | 50052
85674024 | 50053
(25 rows)
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto')
1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto')
(6 rows)
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_4_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_5_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_6_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -410,5 +410,22 @@ DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
UPDATE color SET color_id = 1;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
DROP TABLE IF EXISTS test;
CREATE TABLE test (x int, y int, z bigint generated by default as identity);
SELECT create_distributed_table('test', 'x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test VALUES (1,2);
INSERT INTO test SELECT x, y FROM test WHERE x = 1;
SELECT * FROM test;
x | y | z
---------------------------------------------------------------------
1 | 2 | 1
1 | 2 | 2
(2 rows)
DROP SCHEMA generated_identities CASCADE;
DROP USER identity_test_user;

View File

@ -410,5 +410,22 @@ DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
UPDATE color SET color_id = 1;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
DROP TABLE IF EXISTS test;
CREATE TABLE test (x int, y int, z bigint generated by default as identity);
SELECT create_distributed_table('test', 'x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test VALUES (1,2);
INSERT INTO test SELECT x, y FROM test WHERE x = 1;
SELECT * FROM test;
x | y | z
---------------------------------------------------------------------
1 | 2 | 1
1 | 2 | 2
(2 rows)
DROP SCHEMA generated_identities CASCADE;
DROP USER identity_test_user;

View File

@ -25,7 +25,9 @@ NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipp
-- This allows us to test the cleanup logic at the start of the shard move.
\c - - - :worker_1_port
SET search_path TO logical_replication;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_6830000;
RESET citus.enable_ddl_propagation;
\c - - - :master_port
SET search_path TO logical_replication;
CREATE TABLE dist_6830000(
@ -155,6 +157,13 @@ SELECT count(*) from dist;
100
(1 row)
DROP PUBLICATION citus_shard_move_publication_:postgres_oid;
SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
pg_drop_replication_slot
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
@ -188,3 +197,9 @@ ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE);
DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid;
DROP SCHEMA logical_replication CASCADE;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)

View File

@ -713,13 +713,16 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
CREATE TABLE publication_test_table(id int);
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
CREATE OPERATOR === (
LEFTARG = int,
RIGHTARG = int,
FUNCTION = int4eq
);
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false))
AS (VALUES ('operator', ARRAY['===']::text[], ARRAY['int','int']::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: publication object can not be distributed by Citus
ERROR: operator object can not be distributed by Citus
ROLLBACK;
-- Show that citus_internal_add_object_metadata checks the priviliges
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;

View File

@ -22,7 +22,7 @@ SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND
SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port;
worker_2_id
---------------------------------------------------------------------
18
35
(1 row)
\gset

View File

@ -444,7 +444,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
(4 rows)
-- or, we set it to walsender
-- the shards and indexes do show up
-- the shards and indexes do not show up
SELECT set_backend_type(9);
NOTICE: backend type switched to: walsender
set_backend_type
@ -452,6 +452,17 @@ NOTICE: backend type switched to: walsender
(1 row)
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname
---------------------------------------------------------------------
test_index
test_table
test_table_102008
test_table_2_1130000
(4 rows)
-- unless the application name starts with citus_shard
SET application_name = 'citus_shard_move';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname
---------------------------------------------------------------------
@ -467,6 +478,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
test_table_2_1130000
(10 rows)
RESET application_name;
-- but, client backends to see the shards
SELECT set_backend_type(3);
NOTICE: backend type switched to: client backend

View File

@ -16,7 +16,7 @@ SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND
SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port;
worker_2_id
---------------------------------------------------------------------
18
35
(1 row)
\gset

View File

@ -0,0 +1,379 @@
CREATE SCHEMA publication;
CREATE SCHEMA "publication-1";
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently
SELECT citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_2_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
-- create some publications with conflicting names on worker node
-- publication will be different from coordinator
CREATE PUBLICATION "pub-all";
-- publication will be same as coordinator
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');;
\c - - - :master_port
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- do not create publications on worker 2 initially
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- create a non-distributed publication
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete');
RESET citus.enable_ddl_propagation;
ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate');
-- create regular, distributed publications
CREATE PUBLICATION pubempty;
CREATE PUBLICATION pubinsertonly WITH (publish = 'insert');
CREATE PUBLICATION "pub-all" FOR ALL TABLES;
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');
-- add worker 2 with publications
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Check publications on all the nodes, if we see the same publication name twice then its definition differs
-- Note that publications are special in the sense that the coordinator object might differ from
-- worker objects due to the presence of regular tables.
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubempty WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubinsertonly WITH (publish_via_partition_root = ''false'', publish = ''insert'')');
(4 rows)
CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml);
CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int);
CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int);
-- various operations on a publication with only local tables
CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate');
ALTER PUBLICATION pubtables_orig DROP TABLE test;
ALTER PUBLICATION pubtables_orig ADD TABLE test;
-- publication will be empty on worker nodes, since all tables are local
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables_orig WITH (publish_via_partition_root = ''false'', publish = ''insert, truncate'')');
(1 row)
-- distribute a table, creating a mixed publication
SELECT create_distributed_table('test','x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- some generic operations
ALTER PUBLICATION pubtables_orig RENAME TO pubtables;
ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete');
ALTER PUBLICATION pubtables OWNER TO postgres;
ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete');
ERROR: unrecognized value for publication option "publish": "inert"
ALTER PUBLICATION pubtables ADD TABLE notexist;
ERROR: relation "notexist" does not exist
-- operations with a distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test;
ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs";
-- operations with a local table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
SELECT create_distributed_table('"test-pubs"', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- test and test-pubs will show up in worker nodes
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, TABLE publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete'')');
(1 row)
-- operations with a strangely named distributed table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
-- create a publication with distributed and local tables
DROP PUBLICATION pubtables;
CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs";
-- change distributed tables
SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true);
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('test');
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table_concurrently('test', 'x');
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('"test-pubs"');
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('"test-pubs"');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- publications are unchanged despite various tranformations
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, TABLE publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- partitioned table
CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a);
CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10);
ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey;
CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20);
ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
SELECT create_distributed_table('testpub_partitioned', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')');
(1 row)
DROP PUBLICATION pubpartitioned;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
-- add a partition
ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
ERROR: malformed array literal: ""
DETAIL: Array value must start with "{" or dimension information.
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
DROP PUBLICATION pubempty;
DROP PUBLICATION pubtables;
DROP PUBLICATION pubinsertonly;
DROP PUBLICATION "pub-all-insertupdateonly";
DROP PUBLICATION "pub-all";
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION pubnotdistributed;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
\q
\endif
-- recreate a mixed publication
CREATE PUBLICATION pubtables FOR TABLE test, "publication-1"."test-pubs";
-- operations on an existing distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test (y);
ALTER PUBLICATION pubtables SET TABLE test WHERE (doc IS DOCUMENT);
ALTER PUBLICATION pubtables SET TABLE test WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE doc));
ALTER PUBLICATION pubtables SET TABLE test WHERE (CASE x WHEN 5 THEN true ELSE false END);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test WHERE (CASE test.x WHEN 5 THEN true ELSE false END) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
ALTER PUBLICATION pubtables SET TABLE test ("column-1", x) WHERE (x > "column-1"), "publication-1"."test-pubs";
-- operations on a local table
ALTER PUBLICATION pubtables DROP TABLE "publication-1"."test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "publication-1"."test-pubs" (y);
-- mixed operations
ALTER PUBLICATION pubtables SET TABLE test, TABLES IN SCHEMA "publication-1", TABLES IN SCHEMA current_schema;
ALTER PUBLICATION pubtables SET TABLE "publication-1"."test-pubs", test ("column-1", x) WHERE (x > "column-1");
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test (x, "column-1") WHERE ((test.x > test."column-1")) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- publication with schemas
CREATE PUBLICATION "pub-mix" FOR TABLE test, TABLES IN SCHEMA current_schema, TABLE "publication-1"."test-pubs", TABLES IN SCHEMA "publication-1";
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pub-mix%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-mix" FOR TABLES IN SCHEMA publication, TABLES IN SCHEMA "publication-1", TABLE publication.test WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- publication on a partitioned table
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned (a, b) WITH (publish_via_partition_root = 'true');
ALTER PUBLICATION pubpartitioned SET (publish_via_partition_root = 1);
SELECT alter_distributed_table('testpub_partitioned', shard_count := 6, cascade_to_colocated := true);
NOTICE: converting the partitions of publication.testpub_partitioned
NOTICE: creating a new table for publication.testpub_partitioned_0
NOTICE: moving the data of publication.testpub_partitioned_0
NOTICE: dropping the old publication.testpub_partitioned_0
NOTICE: renaming the new table to publication.testpub_partitioned_0
NOTICE: creating a new table for publication.testpub_partitioned_1
NOTICE: moving the data of publication.testpub_partitioned_1
NOTICE: dropping the old publication.testpub_partitioned_1
NOTICE: renaming the new table to publication.testpub_partitioned_1
NOTICE: creating a new table for publication.testpub_partitioned
NOTICE: dropping the old publication.testpub_partitioned
NOTICE: renaming the new table to publication.testpub_partitioned
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned (a, b) WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- make sure we propagate schema dependencies
SET citus.create_object_propagation TO 'deferred';
BEGIN;
CREATE SCHEMA deptest;
END;
CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest;
RESET citus.create_object_propagation;
DROP SCHEMA deptest CASCADE;
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
DROP PUBLICATION pubdep;
DROP PUBLICATION "pub-mix";
DROP PUBLICATION pubtables;
DROP PUBLICATION pubpartitioned;
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -0,0 +1,273 @@
CREATE SCHEMA publication;
CREATE SCHEMA "publication-1";
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently
SELECT citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_2_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
-- create some publications with conflicting names on worker node
-- publication will be different from coordinator
CREATE PUBLICATION "pub-all";
-- publication will be same as coordinator
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');;
\c - - - :master_port
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- do not create publications on worker 2 initially
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- create a non-distributed publication
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete');
RESET citus.enable_ddl_propagation;
ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate');
-- create regular, distributed publications
CREATE PUBLICATION pubempty;
CREATE PUBLICATION pubinsertonly WITH (publish = 'insert');
CREATE PUBLICATION "pub-all" FOR ALL TABLES;
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');
-- add worker 2 with publications
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Check publications on all the nodes, if we see the same publication name twice then its definition differs
-- Note that publications are special in the sense that the coordinator object might differ from
-- worker objects due to the presence of regular tables.
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubempty WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubinsertonly WITH (publish_via_partition_root = ''false'', publish = ''insert'')');
(4 rows)
CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml);
CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int);
CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int);
-- various operations on a publication with only local tables
CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate');
ALTER PUBLICATION pubtables_orig DROP TABLE test;
ALTER PUBLICATION pubtables_orig ADD TABLE test;
-- publication will be empty on worker nodes, since all tables are local
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables_orig WITH (publish_via_partition_root = ''false'', publish = ''insert, truncate'')');
(1 row)
-- distribute a table, creating a mixed publication
SELECT create_distributed_table('test','x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- some generic operations
ALTER PUBLICATION pubtables_orig RENAME TO pubtables;
ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete');
ALTER PUBLICATION pubtables OWNER TO postgres;
ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete');
ERROR: unrecognized "publish" value: "inert"
ALTER PUBLICATION pubtables ADD TABLE notexist;
ERROR: relation "notexist" does not exist
-- operations with a distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test;
ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs";
-- operations with a local table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
SELECT create_distributed_table('"test-pubs"', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- test and test-pubs will show up in worker nodes
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete'')');
(1 row)
-- operations with a strangely named distributed table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
-- create a publication with distributed and local tables
DROP PUBLICATION pubtables;
CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs";
-- change distributed tables
SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true);
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('test');
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table_concurrently('test', 'x');
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('"test-pubs"');
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('"test-pubs"');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- publications are unchanged despite various tranformations
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- partitioned table
CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a);
CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10);
ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey;
CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20);
ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
SELECT create_distributed_table('testpub_partitioned', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')');
(1 row)
DROP PUBLICATION pubpartitioned;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
-- add a partition
ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
ERROR: malformed array literal: ""
DETAIL: Array value must start with "{" or dimension information.
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
DROP PUBLICATION pubempty;
DROP PUBLICATION pubtables;
DROP PUBLICATION pubinsertonly;
DROP PUBLICATION "pub-all-insertupdateonly";
DROP PUBLICATION "pub-all";
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION pubnotdistributed;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
\q

View File

@ -61,7 +61,9 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
@ -261,7 +263,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
@ -428,7 +432,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
@ -597,8 +603,10 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,

View File

@ -5,6 +5,7 @@ test: shard_rebalancer_unit
test: shard_rebalancer
test: background_rebalance
test: worker_copy_table_to_node
test: background_rebalance_parallel
test: foreign_key_to_reference_shard_rebalance
test: multi_move_mx
test: shard_move_deferred_delete

View File

@ -104,6 +104,8 @@ SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical');
SELECT citus_rebalance_wait();
SELECT state, details from citus_rebalance_status();
SELECT public.wait_for_resource_cleanup();
-- Remove coordinator again to allow rerunning of this test
SELECT 1 FROM citus_remove_node('localhost', :master_port);
SELECT public.wait_until_metadata_sync(30000);

View File

@ -0,0 +1,141 @@
/*
Test to check if the background tasks scheduled by the background rebalancer
has the correct dependencies.
*/
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
/* Add two new node so that we can rebalance */
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
SELECT * FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
/*Check that a move is dependent on
1. any other move scheduled earlier in its colocation group.
2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC;
/* Check that if there is a reference table that needs to be synched to a node,
any move without a dependency must depend on the move task for reference table. */
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
SELECT public.wait_for_resource_cleanup();
SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true);
/* Drain worker_3 so that we can move only one colocation group to worker_3
to create an unbalance that would cause parallel rebalancing. */
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
CALL citus_cleanup_orphaned_resources();
CREATE TABLE ref_table(a int PRIMARY KEY);
SELECT create_reference_table('ref_table');
/* Move all the shards of Colocation group 3 to worker_3.*/
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
CALL citus_cleanup_orphaned_resources();
/* Activate and new nodes so that we can rebalance. */
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
SELECT * FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC;
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_3_port);
select citus_remove_node('localhost', :worker_4_port);
select citus_remove_node('localhost', :worker_5_port);
select citus_remove_node('localhost', :worker_6_port);
-- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -224,5 +224,12 @@ INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red'
UPDATE color SET color_id = NULL;
UPDATE color SET color_id = 1;
DROP TABLE IF EXISTS test;
CREATE TABLE test (x int, y int, z bigint generated by default as identity);
SELECT create_distributed_table('test', 'x', colocate_with := 'none');
INSERT INTO test VALUES (1,2);
INSERT INTO test SELECT x, y FROM test WHERE x = 1;
SELECT * FROM test;
DROP SCHEMA generated_identities CASCADE;
DROP USER identity_test_user;

View File

@ -21,7 +21,9 @@ SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0);
-- This allows us to test the cleanup logic at the start of the shard move.
\c - - - :worker_1_port
SET search_path TO logical_replication;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_6830000;
RESET citus.enable_ddl_propagation;
\c - - - :master_port
SET search_path TO logical_replication;
@ -72,6 +74,9 @@ SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT count(*) from dist;
DROP PUBLICATION citus_shard_move_publication_:postgres_oid;
SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
\c - - - :worker_2_port
SET search_path TO logical_replication;
@ -88,3 +93,4 @@ ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE);
DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid;
DROP SCHEMA logical_replication CASCADE;
SELECT public.wait_for_resource_cleanup();

View File

@ -429,12 +429,15 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
CREATE TABLE publication_test_table(id int);
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
CREATE OPERATOR === (
LEFTARG = int,
RIGHTARG = int,
FUNCTION = int4eq
);
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false))
AS (VALUES ('operator', ARRAY['===']::text[], ARRAY['int','int']::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;

View File

@ -232,10 +232,15 @@ SELECT set_backend_type(4);
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
-- or, we set it to walsender
-- the shards and indexes do show up
-- the shards and indexes do not show up
SELECT set_backend_type(9);
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
-- unless the application name starts with citus_shard
SET application_name = 'citus_shard_move';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
RESET application_name;
-- but, client backends to see the shards
SELECT set_backend_type(3);
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;

View File

@ -0,0 +1,269 @@
CREATE SCHEMA publication;
CREATE SCHEMA "publication-1";
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently
SELECT citus_set_coordinator_host('localhost', :master_port);
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_2_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
-- create some publications with conflicting names on worker node
-- publication will be different from coordinator
CREATE PUBLICATION "pub-all";
-- publication will be same as coordinator
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');;
\c - - - :master_port
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- do not create publications on worker 2 initially
SELECT citus_remove_node('localhost', :worker_2_port);
-- create a non-distributed publication
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete');
RESET citus.enable_ddl_propagation;
ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate');
-- create regular, distributed publications
CREATE PUBLICATION pubempty;
CREATE PUBLICATION pubinsertonly WITH (publish = 'insert');
CREATE PUBLICATION "pub-all" FOR ALL TABLES;
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');
-- add worker 2 with publications
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- Check publications on all the nodes, if we see the same publication name twice then its definition differs
-- Note that publications are special in the sense that the coordinator object might differ from
-- worker objects due to the presence of regular tables.
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$)
ORDER BY c) s;
CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml);
CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int);
CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int);
-- various operations on a publication with only local tables
CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate');
ALTER PUBLICATION pubtables_orig DROP TABLE test;
ALTER PUBLICATION pubtables_orig ADD TABLE test;
-- publication will be empty on worker nodes, since all tables are local
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- distribute a table, creating a mixed publication
SELECT create_distributed_table('test','x', colocate_with := 'none');
-- some generic operations
ALTER PUBLICATION pubtables_orig RENAME TO pubtables;
ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete');
ALTER PUBLICATION pubtables OWNER TO postgres;
ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete');
ALTER PUBLICATION pubtables ADD TABLE notexist;
-- operations with a distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test;
ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs";
-- operations with a local table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
SELECT create_distributed_table('"test-pubs"', 'x');
-- test and test-pubs will show up in worker nodes
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- operations with a strangely named distributed table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
-- create a publication with distributed and local tables
DROP PUBLICATION pubtables;
CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs";
-- change distributed tables
SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true);
SELECT undistribute_table('test');
SELECT citus_add_local_table_to_metadata('test');
SELECT create_distributed_table_concurrently('test', 'x');
SELECT undistribute_table('"test-pubs"');
SELECT create_reference_table('"test-pubs"');
-- publications are unchanged despite various tranformations
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- partitioned table
CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a);
CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10);
ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey;
CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20);
ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
SELECT create_distributed_table('testpub_partitioned', 'a');
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
DROP PUBLICATION pubpartitioned;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
-- add a partition
ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
DROP PUBLICATION pubempty;
DROP PUBLICATION pubtables;
DROP PUBLICATION pubinsertonly;
DROP PUBLICATION "pub-all-insertupdateonly";
DROP PUBLICATION "pub-all";
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION pubnotdistributed;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
\q
\endif
-- recreate a mixed publication
CREATE PUBLICATION pubtables FOR TABLE test, "publication-1"."test-pubs";
-- operations on an existing distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test (y);
ALTER PUBLICATION pubtables SET TABLE test WHERE (doc IS DOCUMENT);
ALTER PUBLICATION pubtables SET TABLE test WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE doc));
ALTER PUBLICATION pubtables SET TABLE test WHERE (CASE x WHEN 5 THEN true ELSE false END);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
ALTER PUBLICATION pubtables SET TABLE test ("column-1", x) WHERE (x > "column-1"), "publication-1"."test-pubs";
-- operations on a local table
ALTER PUBLICATION pubtables DROP TABLE "publication-1"."test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "publication-1"."test-pubs" (y);
-- mixed operations
ALTER PUBLICATION pubtables SET TABLE test, TABLES IN SCHEMA "publication-1", TABLES IN SCHEMA current_schema;
ALTER PUBLICATION pubtables SET TABLE "publication-1"."test-pubs", test ("column-1", x) WHERE (x > "column-1");
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- publication with schemas
CREATE PUBLICATION "pub-mix" FOR TABLE test, TABLES IN SCHEMA current_schema, TABLE "publication-1"."test-pubs", TABLES IN SCHEMA "publication-1";
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pub-mix%' ORDER BY 1) s$$)
ORDER BY c) s;
-- publication on a partitioned table
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned (a, b) WITH (publish_via_partition_root = 'true');
ALTER PUBLICATION pubpartitioned SET (publish_via_partition_root = 1);
SELECT alter_distributed_table('testpub_partitioned', shard_count := 6, cascade_to_colocated := true);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
-- make sure we propagate schema dependencies
SET citus.create_object_propagation TO 'deferred';
BEGIN;
CREATE SCHEMA deptest;
END;
CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest;
RESET citus.create_object_propagation;
DROP SCHEMA deptest CASCADE;
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
DROP PUBLICATION pubdep;
DROP PUBLICATION "pub-mix";
DROP PUBLICATION pubtables;
DROP PUBLICATION pubpartitioned;
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);

View File

@ -64,7 +64,9 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
@ -176,7 +178,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
@ -282,7 +286,9 @@ SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
@ -401,8 +407,10 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,