Propagate CREATE/ALTER/DROP PUBLICATION statements (#6776)

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/6596/merge
Marco Slot 2023-03-29 15:25:35 +02:00 committed by GitHub
commit ce4bcf6de0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2909 additions and 75 deletions

View File

@ -217,6 +217,7 @@ static bool WillRecreateForeignKeyToReferenceTable(Oid relationId,
CascadeToColocatedOption cascadeOption);
static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId);
static void ErrorIfUnsupportedCascadeObjects(Oid relationId);
static List * WrapTableDDLCommands(List *commandStrings);
static bool DoesCascadeDropUnsupportedObject(Oid classId, Oid id, HTAB *nodeMap);
static TableConversionReturn * CopyTableConversionReturnIntoCurrentContext(
TableConversionReturn *tableConversionReturn);
@ -604,9 +605,18 @@ ConvertTableInternal(TableConversionState *con)
List *justBeforeDropCommands = NIL;
List *attachPartitionCommands = NIL;
postLoadCommands =
list_concat(postLoadCommands,
GetViewCreationTableDDLCommandsOfTable(con->relationId));
List *createViewCommands = GetViewCreationCommandsOfTable(con->relationId);
postLoadCommands = list_concat(postLoadCommands,
WrapTableDDLCommands(createViewCommands));
/* need to add back to publications after dropping the original table */
bool isAdd = true;
List *alterPublicationCommands =
GetAlterPublicationDDLCommandsForTable(con->relationId, isAdd);
postLoadCommands = list_concat(postLoadCommands,
WrapTableDDLCommands(alterPublicationCommands));
List *foreignKeyCommands = NIL;
if (con->conversionType == ALTER_DISTRIBUTED_TABLE)
@ -1493,17 +1503,16 @@ GetViewCreationCommandsOfTable(Oid relationId)
/*
* GetViewCreationTableDDLCommandsOfTable is the same as GetViewCreationCommandsOfTable,
* but the returned list includes objects of TableDDLCommand's, not strings.
* WrapTableDDLCommands takes a list of command strings and wraps them
* in TableDDLCommand structs.
*/
List *
GetViewCreationTableDDLCommandsOfTable(Oid relationId)
static List *
WrapTableDDLCommands(List *commandStrings)
{
List *commands = GetViewCreationCommandsOfTable(relationId);
List *tableDDLCommands = NIL;
char *command = NULL;
foreach_ptr(command, commands)
foreach_ptr(command, commandStrings)
{
tableDDLCommands = lappend(tableDDLCommands, makeTableDDLCommandString(command));
}

View File

@ -85,6 +85,7 @@ static void DropRelationTruncateTriggers(Oid relationId);
static char * GetDropTriggerCommand(Oid relationId, char *triggerName);
static void DropViewsOnTable(Oid relationId);
static void DropIdentitiesOnTable(Oid relationId);
static void DropTableFromPublications(Oid relationId);
static List * GetRenameStatsCommandList(List *statsOidList, uint64 shardId);
static List * ReversedOidList(List *oidList);
static void AppendExplicitIndexIdsToList(Form_pg_index indexForm,
@ -338,6 +339,10 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
List *shellTableDDLEvents = GetShellTableDDLEventsForCitusLocalTable(relationId);
List *tableViewCreationCommands = GetViewCreationCommandsOfTable(relationId);
bool isAdd = true;
List *alterPublicationCommands =
GetAlterPublicationDDLCommandsForTable(relationId, isAdd);
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
@ -347,6 +352,12 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
*/
DropIdentitiesOnTable(relationId);
/*
* We do not want the shard to be in the publication (subscribers are
* unlikely to recognize it).
*/
DropTableFromPublications(relationId);
/* below we convert relation with relationId to the shard relation */
uint64 shardId = ConvertLocalTableToShard(relationId);
@ -363,6 +374,11 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
*/
ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(tableViewCreationCommands);
/*
* Execute the publication creation commands with the shell table.
*/
ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(alterPublicationCommands);
/*
* Set shellRelationId as the relation with relationId now points
* to the shard relation.
@ -1163,6 +1179,21 @@ DropIdentitiesOnTable(Oid relationId)
}
/*
* DropTableFromPublications drops the table from all of its publications.
*/
static void
DropTableFromPublications(Oid relationId)
{
bool isAdd = false;
List *alterPublicationCommands =
GetAlterPublicationDDLCommandsForTable(relationId, isAdd);
ExecuteAndLogUtilityCommandList(alterPublicationCommands);
}
/*
* DropViewsOnTable drops the views that depend on the given relation.
*/

View File

@ -438,6 +438,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return DDLCommands;
}
case OCLASS_PUBLICATION:
{
return CreatePublicationDDLCommandsIdempotent(dependency);
}
case OCLASS_ROLE:
{
return GenerateCreateOrAlterRoleCommand(dependency->objectId);

View File

@ -245,6 +245,15 @@ static DistributeObjectOps Any_CreatePolicy = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_CreatePublication = {
.deparse = DeparseCreatePublicationStmt,
.qualify = QualifyCreatePublicationStmt,
.preprocess = NULL,
.postprocess = PostProcessCreatePublicationStmt,
.operationType = DIST_OPS_CREATE,
.address = CreatePublicationStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_CreateRole = {
.deparse = DeparseCreateRoleStmt,
.qualify = NULL,
@ -707,6 +716,45 @@ static DistributeObjectOps Procedure_Rename = {
.address = RenameFunctionStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Publication_Alter = {
.deparse = DeparseAlterPublicationStmt,
.qualify = QualifyAlterPublicationStmt,
.preprocess = PreprocessAlterPublicationStmt,
.postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_PUBLICATION,
.operationType = DIST_OPS_ALTER,
.address = AlterPublicationStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Publication_AlterOwner = {
.deparse = DeparseAlterPublicationOwnerStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_PUBLICATION,
.operationType = DIST_OPS_ALTER,
.address = AlterPublicationOwnerStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Publication_Drop = {
.deparse = DeparseDropPublicationStmt,
.qualify = NULL,
.preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL,
.operationType = DIST_OPS_DROP,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Publication_Rename = {
.deparse = DeparseRenamePublicationStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL,
.objectType = OBJECT_PUBLICATION,
.operationType = DIST_OPS_ALTER,
.address = RenamePublicationStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Routine_AlterObjectDepends = {
.deparse = DeparseAlterFunctionDependsStmt,
.qualify = QualifyAlterFunctionDependsStmt,
@ -1399,6 +1447,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_AlterOwner;
}
case OBJECT_PUBLICATION:
{
return &Publication_AlterOwner;
}
case OBJECT_ROUTINE:
{
return &Routine_AlterOwner;
@ -1436,6 +1489,11 @@ GetDistributeObjectOps(Node *node)
return &Any_AlterPolicy;
}
case T_AlterPublicationStmt:
{
return &Publication_Alter;
}
case T_AlterRoleStmt:
{
return &Any_AlterRole;
@ -1610,6 +1668,11 @@ GetDistributeObjectOps(Node *node)
return &Any_CreatePolicy;
}
case T_CreatePublicationStmt:
{
return &Any_CreatePublication;
}
case T_CreateRoleStmt:
{
return &Any_CreateRole;
@ -1722,6 +1785,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_Drop;
}
case OBJECT_PUBLICATION:
{
return &Publication_Drop;
}
case OBJECT_ROUTINE:
{
return &Routine_Drop;
@ -1901,6 +1969,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_Rename;
}
case OBJECT_PUBLICATION:
{
return &Publication_Rename;
}
case OBJECT_ROUTINE:
{
return &Routine_Rename;

View File

@ -0,0 +1,634 @@
/*-------------------------------------------------------------------------
*
* publication.c
* Commands for creating publications
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/reference_table_utils.h"
#include "distributed/worker_create_or_replace.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "pg_version_compat.h"
static CreatePublicationStmt * BuildCreatePublicationStmt(Oid publicationId);
#if (PG_VERSION_NUM >= PG_VERSION_15)
static PublicationObjSpec * BuildPublicationRelationObjSpec(Oid relationId,
Oid publicationId,
bool tableOnly);
#endif
static void AppendPublishOptionList(StringInfo str, List *strings);
static char * AlterPublicationOwnerCommand(Oid publicationId);
static bool ShouldPropagateCreatePublication(CreatePublicationStmt *stmt);
static List * ObjectAddressForPublicationName(char *publicationName, bool missingOk);
/*
* PostProcessCreatePublicationStmt handles CREATE PUBLICATION statements
* that contain distributed tables.
*/
List *
PostProcessCreatePublicationStmt(Node *node, const char *queryString)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
if (!ShouldPropagateCreatePublication(stmt))
{
/* should not propagate right now */
return NIL;
}
/* call into CreatePublicationStmtObjectAddress */
List *publicationAddresses = GetObjectAddressListFromParseTree(node, false, true);
/* the code-path only supports a single object */
Assert(list_length(publicationAddresses) == 1);
if (IsAnyObjectAddressOwnedByExtension(publicationAddresses, NULL))
{
/* should not propagate publications owned by extensions */
return NIL;
}
EnsureAllObjectDependenciesExistOnAllNodes(publicationAddresses);
const ObjectAddress *pubAddress = linitial(publicationAddresses);
List *commands = NIL;
commands = lappend(commands, DISABLE_DDL_PROPAGATION);
commands = lappend(commands, CreatePublicationDDLCommand(pubAddress->objectId));
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* CreatePublicationDDLCommandsIdempotent returns a list of DDL statements to be
* executed on a node to recreate the publication addressed by the publicationAddress.
*/
List *
CreatePublicationDDLCommandsIdempotent(const ObjectAddress *publicationAddress)
{
Assert(publicationAddress->classId == PublicationRelationId);
char *ddlCommand =
CreatePublicationDDLCommand(publicationAddress->objectId);
char *alterPublicationOwnerSQL =
AlterPublicationOwnerCommand(publicationAddress->objectId);
return list_make2(
WrapCreateOrReplace(ddlCommand),
alterPublicationOwnerSQL);
}
/*
* CreatePublicationDDLCommand returns the CREATE PUBLICATION string that
* can be used to recreate a given publication.
*/
char *
CreatePublicationDDLCommand(Oid publicationId)
{
CreatePublicationStmt *createPubStmt = BuildCreatePublicationStmt(publicationId);
/* we took the WHERE clause from the catalog where it is already transformed */
bool whereClauseRequiresTransform = false;
/* only propagate Citus tables in publication */
bool includeLocalTables = false;
return DeparseCreatePublicationStmtExtended((Node *) createPubStmt,
whereClauseRequiresTransform,
includeLocalTables);
}
/*
* BuildCreatePublicationStmt constructs a CreatePublicationStmt struct for the
* given publication.
*/
static CreatePublicationStmt *
BuildCreatePublicationStmt(Oid publicationId)
{
CreatePublicationStmt *createPubStmt = makeNode(CreatePublicationStmt);
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(publicationTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d", publicationId)));
}
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
/* CREATE PUBLICATION <name> */
createPubStmt->pubname = pstrdup(NameStr(publicationForm->pubname));
/* FOR ALL TABLES */
createPubStmt->for_all_tables = publicationForm->puballtables;
ReleaseSysCache(publicationTuple);
#if (PG_VERSION_NUM >= PG_VERSION_15)
List *schemaIds = GetPublicationSchemas(publicationId);
Oid schemaId = InvalidOid;
foreach_oid(schemaId, schemaIds)
{
char *schemaName = get_namespace_name(schemaId);
PublicationObjSpec *publicationObject = makeNode(PublicationObjSpec);
publicationObject->pubobjtype = PUBLICATIONOBJ_TABLES_IN_SCHEMA;
publicationObject->pubtable = NULL;
publicationObject->name = schemaName;
publicationObject->location = -1;
createPubStmt->pubobjects = lappend(createPubStmt->pubobjects, publicationObject);
}
#endif
List *relationIds = GetPublicationRelations(publicationId,
publicationForm->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
Oid relationId = InvalidOid;
int citusTableCount PG_USED_FOR_ASSERTS_ONLY = 0;
/* mainly for consistent ordering in test output */
relationIds = SortList(relationIds, CompareOids);
foreach_oid(relationId, relationIds)
{
#if (PG_VERSION_NUM >= PG_VERSION_15)
bool tableOnly = false;
/* since postgres 15, tables can have a column list and filter */
PublicationObjSpec *publicationObject =
BuildPublicationRelationObjSpec(relationId, publicationId, tableOnly);
createPubStmt->pubobjects = lappend(createPubStmt->pubobjects, publicationObject);
#else
/* before postgres 15, only full tables are supported */
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *tableName = get_rel_name(relationId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
createPubStmt->tables = lappend(createPubStmt->tables, rangeVar);
#endif
if (IsCitusTable(relationId))
{
citusTableCount++;
}
}
/* WITH (publish_via_partition_root = true) option */
bool publishViaRoot = publicationForm->pubviaroot;
char *publishViaRootString = publishViaRoot ? "true" : "false";
DefElem *pubViaRootOption = makeDefElem("publish_via_partition_root",
(Node *) makeString(publishViaRootString),
-1);
createPubStmt->options = lappend(createPubStmt->options, pubViaRootOption);
/* WITH (publish = 'insert, update, delete, truncate') option */
List *publishList = NIL;
if (publicationForm->pubinsert)
{
publishList = lappend(publishList, makeString("insert"));
}
if (publicationForm->pubupdate)
{
publishList = lappend(publishList, makeString("update"));
}
if (publicationForm->pubdelete)
{
publishList = lappend(publishList, makeString("delete"));
}
if (publicationForm->pubtruncate)
{
publishList = lappend(publishList, makeString("truncate"));
}
if (list_length(publishList) > 0)
{
StringInfo optionValue = makeStringInfo();
AppendPublishOptionList(optionValue, publishList);
DefElem *publishOption = makeDefElem("publish",
(Node *) makeString(optionValue->data), -1);
createPubStmt->options = lappend(createPubStmt->options, publishOption);
}
return createPubStmt;
}
/*
* AppendPublishOptionList appends a list of publication options in
* comma-separate form.
*/
static void
AppendPublishOptionList(StringInfo str, List *options)
{
ListCell *stringCell = NULL;
foreach(stringCell, options)
{
const char *string = strVal(lfirst(stringCell));
if (stringCell != list_head(options))
{
appendStringInfoString(str, ", ");
}
/* we cannot escape these strings */
appendStringInfoString(str, string);
}
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* BuildPublicationRelationObjSpec returns a PublicationObjSpec that
* can be included in a CREATE or ALTER PUBLICATION statement.
*/
static PublicationObjSpec *
BuildPublicationRelationObjSpec(Oid relationId, Oid publicationId,
bool tableOnly)
{
HeapTuple pubRelationTuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(relationId),
ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(pubRelationTuple))
{
ereport(ERROR, (errmsg("cannot find relation with oid %d in publication "
"with oid %d", relationId, publicationId)));
}
List *columnNameList = NIL;
Node *whereClause = NULL;
/* build the column list */
if (!tableOnly)
{
bool isNull = false;
Datum attributesDatum = SysCacheGetAttr(PUBLICATIONRELMAP, pubRelationTuple,
Anum_pg_publication_rel_prattrs,
&isNull);
if (!isNull)
{
ArrayType *attributesArray = DatumGetArrayTypeP(attributesDatum);
int attributeCount = ARR_DIMS(attributesArray)[0];
int16 *elems = (int16 *) ARR_DATA_PTR(attributesArray);
for (int attNumIndex = 0; attNumIndex < attributeCount; attNumIndex++)
{
AttrNumber attributeNumber = elems[attNumIndex];
char *columnName = get_attname(relationId, attributeNumber, false);
columnNameList = lappend(columnNameList, makeString(columnName));
}
}
/* build the WHERE clause */
Datum whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, pubRelationTuple,
Anum_pg_publication_rel_prqual,
&isNull);
if (!isNull)
{
/*
* We use the already-transformed parse tree form here, which does
* not match regular CreatePublicationStmt
*/
whereClause = stringToNode(TextDatumGetCString(whereClauseDatum));
}
}
ReleaseSysCache(pubRelationTuple);
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *tableName = get_rel_name(relationId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
/* build the FOR TABLE */
PublicationTable *publicationTable =
makeNode(PublicationTable);
publicationTable->relation = rangeVar;
publicationTable->whereClause = whereClause;
publicationTable->columns = columnNameList;
PublicationObjSpec *publicationObject = makeNode(PublicationObjSpec);
publicationObject->pubobjtype = PUBLICATIONOBJ_TABLE;
publicationObject->pubtable = publicationTable;
publicationObject->name = NULL;
publicationObject->location = -1;
return publicationObject;
}
#endif
/*
* PreprocessAlterPublicationStmt handles ALTER PUBLICATION statements
* in a way that is mostly similar to PreprocessAlterDistributedObjectStmt,
* except we do not ensure sequential mode (publications do not interact with
* shards) and can handle NULL deparse commands for ALTER PUBLICATION commands
* that only involve local tables.
*/
List *
PreprocessAlterPublicationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
List *addresses = GetObjectAddressListFromParseTree(stmt, false, false);
/* the code-path only supports a single object */
Assert(list_length(addresses) == 1);
if (!ShouldPropagateAnyObject(addresses))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode(stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
if (sql == NULL)
{
/*
* Deparsing logic decided that there is nothing to propagate, e.g.
* because the command only concerns local tables.
*/
return NIL;
}
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* GetAlterPublicationDDLCommandsForTable gets a list of ALTER PUBLICATION .. ADD/DROP
* commands for the given table.
*
* If isAdd is true, it return ALTER PUBLICATION .. ADD TABLE commands for all
* publications.
*
* Otherwise, it returns ALTER PUBLICATION .. DROP TABLE commands for all
* publications.
*/
List *
GetAlterPublicationDDLCommandsForTable(Oid relationId, bool isAdd)
{
List *commands = NIL;
List *publicationIds = GetRelationPublications(relationId);
Oid publicationId = InvalidOid;
foreach_oid(publicationId, publicationIds)
{
char *command = GetAlterPublicationTableDDLCommand(publicationId,
relationId, isAdd);
commands = lappend(commands, command);
}
return commands;
}
/*
* GetAlterPublicationTableDDLCommand generates an ALTer PUBLICATION .. ADD/DROP TABLE
* command for the given publication and relation ID.
*
* If isAdd is true, it return an ALTER PUBLICATION .. ADD TABLE command.
* Otherwise, it returns ALTER PUBLICATION .. DROP TABLE command.
*/
char *
GetAlterPublicationTableDDLCommand(Oid publicationId, Oid relationId,
bool isAdd)
{
HeapTuple pubTuple = SearchSysCache1(PUBLICATIONOID,
ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(pubTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d",
publicationId)));
}
Form_pg_publication pubForm = (Form_pg_publication) GETSTRUCT(pubTuple);
AlterPublicationStmt *alterPubStmt = makeNode(AlterPublicationStmt);
alterPubStmt->pubname = pstrdup(NameStr(pubForm->pubname));
ReleaseSysCache(pubTuple);
#if (PG_VERSION_NUM >= PG_VERSION_15)
bool tableOnly = !isAdd;
/* since postgres 15, tables can have a column list and filter */
PublicationObjSpec *publicationObject =
BuildPublicationRelationObjSpec(relationId, publicationId, tableOnly);
alterPubStmt->pubobjects = lappend(alterPubStmt->pubobjects, publicationObject);
alterPubStmt->action = isAdd ? AP_AddObjects : AP_DropObjects;
#else
/* before postgres 15, only full tables are supported */
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *tableName = get_rel_name(relationId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
alterPubStmt->tables = lappend(alterPubStmt->tables, rangeVar);
alterPubStmt->tableAction = isAdd ? DEFELEM_ADD : DEFELEM_DROP;
#endif
/* we take the WHERE clause from the catalog where it is already transformed */
bool whereClauseNeedsTransform = false;
/*
* We use these commands to restore publications before/after transforming a
* table, including transformations to/from local tables.
*/
bool includeLocalTables = true;
char *command = DeparseAlterPublicationStmtExtended((Node *) alterPubStmt,
whereClauseNeedsTransform,
includeLocalTables);
return command;
}
/*
* AlterPublicationOwnerCommand returns "ALTER PUBLICATION .. OWNER TO .."
* statement for the specified publication.
*/
static char *
AlterPublicationOwnerCommand(Oid publicationId)
{
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(publicationId));
if (!HeapTupleIsValid(publicationTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d",
publicationId)));
}
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
char *publicationName = NameStr(publicationForm->pubname);
Oid publicationOwnerId = publicationForm->pubowner;
char *publicationOwnerName = GetUserNameFromId(publicationOwnerId, false);
StringInfo alterCommand = makeStringInfo();
appendStringInfo(alterCommand, "ALTER PUBLICATION %s OWNER TO %s",
quote_identifier(publicationName),
quote_identifier(publicationOwnerName));
ReleaseSysCache(publicationTuple);
return alterCommand->data;
}
/*
* ShouldPropagateCreatePublication tests if we need to propagate a CREATE PUBLICATION
* statement.
*/
static bool
ShouldPropagateCreatePublication(CreatePublicationStmt *stmt)
{
if (!ShouldPropagate())
{
return false;
}
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return false;
}
return true;
}
/*
* AlterPublicationStmtObjectAddress generates the object address for the
* publication altered by a regular ALTER PUBLICATION .. statement.
*/
List *
AlterPublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess)
{
AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node);
return ObjectAddressForPublicationName(stmt->pubname, missingOk);
}
/*
* AlterPublicationOwnerStmtObjectAddress generates the object address for the
* publication altered by the given ALTER PUBLICATION .. OWNER TO statement.
*/
List *
AlterPublicationOwnerStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
return ObjectAddressForPublicationName(strVal(stmt->object), missingOk);
}
/*
* CreatePublicationStmtObjectAddress generates the object address for the
* publication created by the given CREATE PUBLICATION statement.
*/
List *
CreatePublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
return ObjectAddressForPublicationName(stmt->pubname, missingOk);
}
/*
* RenamePublicationStmtObjectAddress generates the object address for the
* publication altered by the given ALter PUBLICATION .. RENAME TO statement.
*/
List *
RenamePublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess)
{
RenameStmt *stmt = castNode(RenameStmt, node);
return ObjectAddressForPublicationName(strVal(stmt->object), missingOk);
}
/*
* ObjectAddressForPublicationName returns the object address for a given publication
* name.
*/
static List *
ObjectAddressForPublicationName(char *publicationName, bool missingOk)
{
Oid publicationId = InvalidOid;
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONNAME, CStringGetDatum(publicationName));
if (HeapTupleIsValid(publicationTuple))
{
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
publicationId = publicationForm->oid;
ReleaseSysCache(publicationTuple);
}
else if (!missingOk)
{
/* it should have just been created */
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("publication \"%s\" does not exist", publicationName)));
}
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, PublicationRelationId, publicationId);
return list_make1(address);
}

View File

@ -0,0 +1,690 @@
/*-------------------------------------------------------------------------
*
* deparse_publication_stmts.c
* All routines to deparse publication statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/relation.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/namespace_utils.h"
#include "lib/stringinfo.h"
#include "parser/parse_clause.h"
#include "parser/parse_collate.h"
#include "parser/parse_node.h"
#include "parser/parse_relation.h"
#include "nodes/value.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/ruleutils.h"
static void AppendCreatePublicationStmt(StringInfo buf, CreatePublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables);
#if (PG_VERSION_NUM >= PG_VERSION_15)
static bool AppendPublicationObjects(StringInfo buf, List *publicationObjects,
bool whereClauseNeedsTransform,
bool includeLocalTables);
static void AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName,
Node *whereClause,
bool whereClauseNeedsTransform);
static void AppendAlterPublicationAction(StringInfo buf, AlterPublicationAction action);
#else
static bool AppendTables(StringInfo buf, List *tables, bool includeLocalTables);
static void AppendDefElemAction(StringInfo buf, DefElemAction action);
#endif
static bool AppendAlterPublicationStmt(StringInfo buf, AlterPublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables);
static void AppendDropPublicationStmt(StringInfo buf, DropStmt *stmt);
static void AppendRenamePublicationStmt(StringInfo buf, RenameStmt *stmt);
static void AppendAlterPublicationOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendPublicationOptions(StringInfo stringBuffer, List *optionList);
static void AppendIdentifierList(StringInfo buf, List *objects);
/*
* DeparseCreatePublicationStmt builds and returns a string representing a
* CreatePublicationStmt.
*/
char *
DeparseCreatePublicationStmt(Node *node)
{
/* regular deparsing function takes CREATE PUBLICATION from the parser */
bool whereClauseNeedsTransform = false;
/* for regular CREATE PUBLICATION we do not propagate local tables */
bool includeLocalTables = false;
return DeparseCreatePublicationStmtExtended(node, whereClauseNeedsTransform,
includeLocalTables);
}
/*
* DeparseCreatePublicationStmtExtended builds and returns a string representing a
* CreatePublicationStmt, which may have already-transformed expressions.
*/
char *
DeparseCreatePublicationStmtExtended(Node *node, bool whereClauseNeedsTransform,
bool includeLocalTables)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
AppendCreatePublicationStmt(&str, stmt, whereClauseNeedsTransform,
includeLocalTables);
return str.data;
}
/*
* AppendCreatePublicationStmt appends a string representing a
* CreatePublicationStmt to a buffer.
*/
static void
AppendCreatePublicationStmt(StringInfo buf, CreatePublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables)
{
appendStringInfo(buf, "CREATE PUBLICATION %s",
quote_identifier(stmt->pubname));
if (stmt->for_all_tables)
{
appendStringInfoString(buf, " FOR ALL TABLES");
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
else if (stmt->pubobjects != NIL)
{
bool hasObjects = false;
PublicationObjSpec *publicationObject = NULL;
/*
* Check whether there are objects to propagate, mainly to know whether
* we should include "FOR".
*/
foreach_ptr(publicationObject, stmt->pubobjects)
{
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE)
{
/* FOR TABLE ... */
PublicationTable *publicationTable = publicationObject->pubtable;
if (includeLocalTables ||
IsCitusTableRangeVar(publicationTable->relation, NoLock, false))
{
hasObjects = true;
break;
}
}
else
{
hasObjects = true;
break;
}
}
if (hasObjects)
{
appendStringInfoString(buf, " FOR");
AppendPublicationObjects(buf, stmt->pubobjects, whereClauseNeedsTransform,
includeLocalTables);
}
}
#else
else if (stmt->tables != NIL)
{
bool hasTables = false;
RangeVar *rangeVar = NULL;
/*
* Check whether there are tables to propagate, mainly to know whether
* we should include "FOR".
*/
foreach_ptr(rangeVar, stmt->tables)
{
if (includeLocalTables || IsCitusTableRangeVar(rangeVar, NoLock, false))
{
hasTables = true;
break;
}
}
if (hasTables)
{
appendStringInfoString(buf, " FOR");
AppendTables(buf, stmt->tables, includeLocalTables);
}
}
#endif
if (stmt->options != NIL)
{
appendStringInfoString(buf, " WITH (");
AppendPublicationOptions(buf, stmt->options);
appendStringInfoString(buf, ")");
}
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* AppendPublicationObjects appends a string representing a list of publication
* objects to a buffer.
*
* For instance: TABLE users, departments, TABLES IN SCHEMA production
*/
static bool
AppendPublicationObjects(StringInfo buf, List *publicationObjects,
bool whereClauseNeedsTransform,
bool includeLocalTables)
{
PublicationObjSpec *publicationObject = NULL;
bool appendedObject = false;
foreach_ptr(publicationObject, publicationObjects)
{
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE)
{
/* FOR TABLE ... */
PublicationTable *publicationTable = publicationObject->pubtable;
RangeVar *rangeVar = publicationTable->relation;
char *schemaName = rangeVar->schemaname;
char *tableName = rangeVar->relname;
if (!includeLocalTables && !IsCitusTableRangeVar(rangeVar, NoLock, false))
{
/* do not propagate local tables */
continue;
}
if (schemaName != NULL)
{
/* qualified table name */
appendStringInfo(buf, "%s TABLE %s",
appendedObject ? "," : "",
quote_qualified_identifier(schemaName, tableName));
}
else
{
/* unqualified table name */
appendStringInfo(buf, "%s TABLE %s",
appendedObject ? "," : "",
quote_identifier(tableName));
}
if (publicationTable->columns != NIL)
{
appendStringInfoString(buf, " (");
AppendIdentifierList(buf, publicationTable->columns);
appendStringInfoString(buf, ")");
}
if (publicationTable->whereClause != NULL)
{
appendStringInfoString(buf, " WHERE (");
AppendWhereClauseExpression(buf, rangeVar,
publicationTable->whereClause,
whereClauseNeedsTransform);
appendStringInfoString(buf, ")");
}
}
else
{
/* FOR TABLES IN SCHEMA */
char *schemaName = publicationObject->name;
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
{
List *searchPath = fetch_search_path(false);
if (searchPath == NIL)
{
ereport(ERROR, errcode(ERRCODE_UNDEFINED_SCHEMA),
errmsg("no schema has been selected for "
"CURRENT_SCHEMA"));
}
schemaName = get_namespace_name(linitial_oid(searchPath));
}
appendStringInfo(buf, "%s TABLES IN SCHEMA %s",
appendedObject ? "," : "",
quote_identifier(schemaName));
}
appendedObject = true;
}
return appendedObject;
}
/*
* AppendWhereClauseExpression appends a deparsed expression that can
* contain a filter on the given table. If whereClauseNeedsTransform is set
* the expression is first tranformed.
*/
static void
AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName,
Node *whereClause, bool whereClauseNeedsTransform)
{
Relation relation = relation_openrv(tableName, AccessShareLock);
if (whereClauseNeedsTransform)
{
ParseState *pstate = make_parsestate(NULL);
pstate->p_sourcetext = "";
ParseNamespaceItem *nsitem = addRangeTableEntryForRelation(pstate,
relation,
AccessShareLock, NULL,
false, false);
addNSItemToQuery(pstate, nsitem, false, true, true);
whereClause = transformWhereClause(pstate,
copyObject(whereClause),
EXPR_KIND_WHERE,
"PUBLICATION WHERE");
assign_expr_collations(pstate, whereClause);
}
List *relationContext = deparse_context_for(tableName->relname, relation->rd_id);
PushOverrideEmptySearchPath(CurrentMemoryContext);
char *whereClauseString = deparse_expression(whereClause,
relationContext,
true, true);
PopOverrideSearchPath();
appendStringInfoString(buf, whereClauseString);
relation_close(relation, AccessShareLock);
}
#else
/*
* AppendPublicationObjects appends a string representing a list of publication
* objects to a buffer.
*
* For instance: TABLE users, departments
*/
static bool
AppendTables(StringInfo buf, List *tables, bool includeLocalTables)
{
RangeVar *rangeVar = NULL;
bool appendedObject = false;
foreach_ptr(rangeVar, tables)
{
if (!includeLocalTables &&
!IsCitusTableRangeVar(rangeVar, NoLock, false))
{
/* do not propagate local tables */
continue;
}
char *schemaName = rangeVar->schemaname;
char *tableName = rangeVar->relname;
if (schemaName != NULL)
{
/* qualified table name */
appendStringInfo(buf, "%s %s",
appendedObject ? "," : " TABLE",
quote_qualified_identifier(schemaName, tableName));
}
else
{
/* unqualified table name */
appendStringInfo(buf, "%s %s",
appendedObject ? "," : " TABLE",
quote_identifier(tableName));
}
appendedObject = true;
}
return appendedObject;
}
#endif
/*
* DeparseAlterPublicationSchemaStmt builds and returns a string representing
* an AlterPublicationStmt.
*/
char *
DeparseAlterPublicationStmt(Node *node)
{
/* regular deparsing function takes ALTER PUBLICATION from the parser */
bool whereClauseNeedsTransform = true;
/* for regular ALTER PUBLICATION we do not propagate local tables */
bool includeLocalTables = false;
return DeparseAlterPublicationStmtExtended(node, whereClauseNeedsTransform,
includeLocalTables);
}
/*
* DeparseAlterPublicationStmtExtended builds and returns a string representing a
* AlterPublicationStmt, which may have already-transformed expressions.
*/
char *
DeparseAlterPublicationStmtExtended(Node *node, bool whereClauseNeedsTransform,
bool includeLocalTables)
{
AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
if (!AppendAlterPublicationStmt(&str, stmt, whereClauseNeedsTransform,
includeLocalTables))
{
Assert(!includeLocalTables);
/*
* When there are no objects to propagate, then there is no
* valid ALTER PUBLICATION to construct.
*/
return NULL;
}
return str.data;
}
/*
* AppendAlterPublicationStmt appends a string representing an AlterPublicationStmt
* of the form ALTER PUBLICATION .. ADD/SET/DROP
*/
static bool
AppendAlterPublicationStmt(StringInfo buf, AlterPublicationStmt *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables)
{
appendStringInfo(buf, "ALTER PUBLICATION %s",
quote_identifier(stmt->pubname));
if (stmt->options)
{
appendStringInfoString(buf, " SET (");
AppendPublicationOptions(buf, stmt->options);
appendStringInfoString(buf, ")");
/* changing options cannot be combined with other actions */
return true;
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
AppendAlterPublicationAction(buf, stmt->action);
return AppendPublicationObjects(buf, stmt->pubobjects, whereClauseNeedsTransform,
includeLocalTables);
#else
AppendDefElemAction(buf, stmt->tableAction);
return AppendTables(buf, stmt->tables, includeLocalTables);
#endif
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* AppendAlterPublicationAction appends a string representing an AlterPublicationAction
* to a buffer.
*/
static void
AppendAlterPublicationAction(StringInfo buf, AlterPublicationAction action)
{
switch (action)
{
case AP_AddObjects:
{
appendStringInfoString(buf, " ADD");
break;
}
case AP_DropObjects:
{
appendStringInfoString(buf, " DROP");
break;
}
case AP_SetObjects:
{
appendStringInfoString(buf, " SET");
break;
}
default:
{
ereport(ERROR, (errmsg("unrecognized publication action: %d", action)));
}
}
}
#else
/*
* AppendDefElemAction appends a string representing a DefElemAction
* to a buffer.
*/
static void
AppendDefElemAction(StringInfo buf, DefElemAction action)
{
switch (action)
{
case DEFELEM_ADD:
{
appendStringInfoString(buf, " ADD");
break;
}
case DEFELEM_DROP:
{
appendStringInfoString(buf, " DROP");
break;
}
case DEFELEM_SET:
{
appendStringInfoString(buf, " SET");
break;
}
default:
{
ereport(ERROR, (errmsg("unrecognized publication action: %d", action)));
}
}
}
#endif
/*
* DeparseDropPublicationStmt builds and returns a string representing the DropStmt
*/
char *
DeparseDropPublicationStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->removeType == OBJECT_PUBLICATION);
AppendDropPublicationStmt(&str, stmt);
return str.data;
}
/*
* AppendDropPublicationStmt appends a string representing the DropStmt to a buffer
*/
static void
AppendDropPublicationStmt(StringInfo buf, DropStmt *stmt)
{
appendStringInfoString(buf, "DROP PUBLICATION ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
AppendIdentifierList(buf, stmt->objects);
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
}
/*
* DeparseRenamePublicationStmt builds and returns a string representing the RenameStmt
*/
char *
DeparseRenamePublicationStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->renameType == OBJECT_PUBLICATION);
AppendRenamePublicationStmt(&str, stmt);
return str.data;
}
/*
* AppendRenamePublicationStmt appends a string representing the RenameStmt to a buffer
*/
static void
AppendRenamePublicationStmt(StringInfo buf, RenameStmt *stmt)
{
appendStringInfo(buf, "ALTER PUBLICATION %s RENAME TO %s;",
quote_identifier(strVal(stmt->object)),
quote_identifier(stmt->newname));
}
/*
* DeparseAlterPublicationOwnerStmt builds and returns a string representing the AlterOwnerStmt
*/
char *
DeparseAlterPublicationOwnerStmt(Node *node)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_PUBLICATION);
AppendAlterPublicationOwnerStmt(&str, stmt);
return str.data;
}
/*
* AppendAlterPublicationOwnerStmt appends a string representing the AlterOwnerStmt to a buffer
*/
static void
AppendAlterPublicationOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
{
Assert(stmt->objectType == OBJECT_PUBLICATION);
appendStringInfo(buf, "ALTER PUBLICATION %s OWNER TO %s;",
quote_identifier(strVal(stmt->object)),
RoleSpecString(stmt->newowner, true));
}
/*
* AppendPublicationOptions appends a string representing a list of publication opions.
*/
static void
AppendPublicationOptions(StringInfo stringBuffer, List *optionList)
{
ListCell *optionCell = NULL;
bool firstOptionPrinted = false;
foreach(optionCell, optionList)
{
DefElem *option = (DefElem *) lfirst(optionCell);
char *optionName = option->defname;
char *optionValue = defGetString(option);
NodeTag valueType = nodeTag(option->arg);
if (firstOptionPrinted)
{
appendStringInfo(stringBuffer, ", ");
}
firstOptionPrinted = true;
appendStringInfo(stringBuffer, "%s = ",
quote_identifier(optionName));
#if (PG_VERSION_NUM >= PG_VERSION_15)
if (valueType == T_Integer || valueType == T_Float || valueType == T_Boolean)
#else
if (valueType == T_Integer || valueType == T_Float)
#endif
{
/* string escaping is unnecessary for numeric types and can cause issues */
appendStringInfo(stringBuffer, "%s", optionValue);
}
else
{
appendStringInfo(stringBuffer, "%s", quote_literal_cstr(optionValue));
}
}
}
/*
* AppendIdentifierList appends a string representing a list of
* identifiers (of String type).
*/
static void
AppendIdentifierList(StringInfo buf, List *objects)
{
ListCell *objectCell = NULL;
foreach(objectCell, objects)
{
char *name = strVal(lfirst(objectCell));
if (objectCell != list_head(objects))
{
appendStringInfo(buf, ", ");
}
appendStringInfoString(buf, quote_identifier(name));
}
}

View File

@ -0,0 +1,119 @@
/*-------------------------------------------------------------------------
*
* qualify_publication_stmt.c
* Functions specialized in fully qualifying all publication statements. These
* functions are dispatched from qualify.c
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "nodes/nodes.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#if (PG_VERSION_NUM >= PG_VERSION_15)
static void QualifyPublicationObjects(List *publicationObjects);
#else
static void QualifyTables(List *tables);
#endif
static void QualifyPublicationRangeVar(RangeVar *publication);
/*
* QualifyCreatePublicationStmt quailifies the publication names of the
* CREATE PUBLICATION statement.
*/
void
QualifyCreatePublicationStmt(Node *node)
{
CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node);
#if (PG_VERSION_NUM >= PG_VERSION_15)
QualifyPublicationObjects(stmt->pubobjects);
#else
QualifyTables(stmt->tables);
#endif
}
#if (PG_VERSION_NUM >= PG_VERSION_15)
/*
* QualifyPublicationObjects ensures all table names in a list of
* publication objects are fully qualified.
*/
static void
QualifyPublicationObjects(List *publicationObjects)
{
PublicationObjSpec *publicationObject = NULL;
foreach_ptr(publicationObject, publicationObjects)
{
if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE)
{
/* FOR TABLE ... */
PublicationTable *publicationTable = publicationObject->pubtable;
QualifyPublicationRangeVar(publicationTable->relation);
}
}
}
#else
/*
* QualifyTables ensures all table names in a list are fully qualified.
*/
static void
QualifyTables(List *tables)
{
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, tables)
{
QualifyPublicationRangeVar(rangeVar);
}
}
#endif
/*
* QualifyPublicationObjects ensures all table names in a list of
* publication objects are fully qualified.
*/
void
QualifyAlterPublicationStmt(Node *node)
{
AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node);
#if (PG_VERSION_NUM >= PG_VERSION_15)
QualifyPublicationObjects(stmt->pubobjects);
#else
QualifyTables(stmt->tables);
#endif
}
/*
* QualifyPublicationRangeVar qualifies the given publication RangeVar if it is not qualified.
*/
static void
QualifyPublicationRangeVar(RangeVar *publication)
{
if (publication->schemaname == NULL)
{
Oid publicationOid = RelnameGetRelid(publication->relname);
Oid schemaOid = get_rel_namespace(publicationOid);
publication->schemaname = get_namespace_name(schemaOid);
}
}

View File

@ -802,6 +802,11 @@ GetObjectTypeString(ObjectType objType)
return "function";
}
case OBJECT_PUBLICATION:
{
return "publication";
}
case OBJECT_SCHEMA:
{
return "schema";

View File

@ -132,6 +132,7 @@ typedef struct ViewDependencyNode
static List * GetRelationSequenceDependencyList(Oid relationId);
static List * GetRelationFunctionDependencyList(Oid relationId);
static List * GetRelationTriggerFunctionDependencyList(Oid relationId);
static List * GetPublicationRelationsDependencyList(Oid relationId);
static List * GetRelationStatsSchemaDependencyList(Oid relationId);
static List * GetRelationIndicesDependencyList(Oid relationId);
static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId);
@ -722,6 +723,11 @@ SupportedDependencyByCitus(const ObjectAddress *address)
return true;
}
case OCLASS_PUBLICATION:
{
return true;
}
case OCLASS_TSCONFIG:
{
return true;
@ -1656,6 +1662,36 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
List *ruleRefDepList = GetViewRuleReferenceDependencyList(relationId);
result = list_concat(result, ruleRefDepList);
}
break;
}
case PublicationRelationId:
{
Oid publicationId = target.objectId;
/*
* Publications do not depend directly on relations, because dropping
* the relation will only remove it from the publications. However,
* we add a dependency to ensure the relation is created first when
* adding a node.
*/
List *relationDependencyList =
GetPublicationRelationsDependencyList(publicationId);
result = list_concat(result, relationDependencyList);
/*
* As of PostgreSQL 15, the same applies to schemas.
*/
#if PG_VERSION_NUM >= PG_VERSION_15
List *schemaIdList =
GetPublicationSchemas(publicationId);
List *schemaDependencyList =
CreateObjectAddressDependencyDefList(NamespaceRelationId, schemaIdList);
result = list_concat(result, schemaDependencyList);
#endif
break;
}
default:
@ -1923,6 +1959,33 @@ GetRelationTriggerFunctionDependencyList(Oid relationId)
}
/*
* GetPublicationRelationsDependencyList creates a list of ObjectAddressDependencies for
* a publication on the Citus relations it contains. This helps make sure we distribute
* Citus tables before local tables.
*/
static List *
GetPublicationRelationsDependencyList(Oid publicationId)
{
List *allRelationIds = GetPublicationRelations(publicationId, PUBLICATION_PART_ROOT);
List *citusRelationIds = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, allRelationIds)
{
if (!IsCitusTable(relationId))
{
continue;
}
citusRelationIds = lappend_oid(citusRelationIds, relationId);
}
return CreateObjectAddressDependencyDefList(RelationRelationId, citusRelationIds);
}
/*
* GetTypeConstraintDependencyDefinition creates a list of constraint dependency
* definitions for a given type

View File

@ -590,6 +590,18 @@ IsCitusTable(Oid relationId)
}
/*
* IsCitusTableRangeVar returns whether the table named in the given
* rangeVar is a Citus table.
*/
bool
IsCitusTableRangeVar(RangeVar *rangeVar, LOCKMODE lockMode, bool missingOK)
{
Oid relationId = RangeVarGetRelid(rangeVar, lockMode, missingOK);
return IsCitusTable(relationId);
}
/*
* IsCitusTableViaCatalog returns whether the given relation is a
* distributed table or not.

View File

@ -100,6 +100,7 @@ static bool HasMetadataWorkers(void);
static void CreateShellTableOnWorkers(Oid relationId);
static void CreateTableMetadataOnWorkers(Oid relationId);
static void CreateDependingViewsOnWorkers(Oid relationId);
static void AddTableToPublications(Oid relationId);
static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
bool citusTableWithNoDistKey);
@ -302,7 +303,8 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
* Our definition of metadata includes the shell table and its inter relations with
* other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard
* and pg_dist_shard placement entries. This function also propagates the views that
* depend on the given relation, to the metadata workers.
* depend on the given relation, to the metadata workers, and adds the relation to
* the appropriate publications.
*/
void
SyncCitusTableMetadata(Oid relationId)
@ -319,6 +321,7 @@ SyncCitusTableMetadata(Oid relationId)
}
CreateDependingViewsOnWorkers(relationId);
AddTableToPublications(relationId);
}
@ -364,6 +367,49 @@ CreateDependingViewsOnWorkers(Oid relationId)
}
/*
* AddTableToPublications adds the table to a publication on workers with metadata.
*/
static void
AddTableToPublications(Oid relationId)
{
List *publicationIds = GetRelationPublications(relationId);
if (publicationIds == NIL)
{
return;
}
Oid publicationId = InvalidOid;
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
foreach_oid(publicationId, publicationIds)
{
ObjectAddress *publicationAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*publicationAddress, PublicationRelationId, publicationId);
List *addresses = list_make1(publicationAddress);
if (!ShouldPropagateAnyObject(addresses))
{
/* skip non-distributed publications */
continue;
}
/* ensure schemas exist */
EnsureAllObjectDependenciesExistOnAllNodes(addresses);
bool isAdd = true;
char *alterPublicationCommand =
GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd);
/* send ALTER PUBLICATION .. ADD to workers with metadata */
SendCommandToWorkersWithMetadata(alterPublicationCommand);
}
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
}
/*
* EnsureSequentialModeMetadataOperations makes sure that the current transaction is
* already in sequential mode, or can still safely be put in sequential mode,

View File

@ -425,6 +425,7 @@ ErrorIfCurrentUserCanNotDistributeObject(char *textType, ObjectType type,
case OBJECT_COLLATION:
case OBJECT_VIEW:
case OBJECT_ROLE:
case OBJECT_PUBLICATION:
{
check_object_ownership(userId, type, *addr, node, *relation);
break;

View File

@ -88,6 +88,8 @@ static const char *replicationSlotPrefix[] = {
* IMPORTANT: All the subscription names should start with "citus_". Otherwise
* our utility hook does not defend against non-superusers altering or dropping
* them, which is important for security purposes.
*
* We should also keep these in sync with IsCitusShardTransferBackend().
*/
static const char *subscriptionPrefix[] = {
[SHARD_MOVE] = "citus_shard_move_subscription_",
@ -1338,7 +1340,9 @@ CreatePublications(MultiConnection *connection,
worker->groupId,
CLEANUP_ALWAYS);
ExecuteCriticalRemoteCommand(connection, DISABLE_DDL_PROPAGATION);
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
ExecuteCriticalRemoteCommand(connection, ENABLE_DDL_PROPAGATION);
pfree(createPublicationCommand->data);
pfree(createPublicationCommand);
}

View File

@ -1,4 +1,9 @@
-- citus--11.2-1--11.3-1
#include "udfs/repl_origin_helper/11.3-1.sql"
-- bump version to 11.3-1
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY USING INDEX pg_dist_rebalance_strategy_name_key;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_shardid_index;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint;

View File

@ -1,4 +1,12 @@
-- citus--11.3-1--11.2-1
DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active();
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;

View File

@ -1452,6 +1452,21 @@ IsExternalClientBackend(void)
}
/*
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
* rebalancer initiated.
*/
bool
IsCitusShardTransferBackend(void)
{
int prefixLength = strlen(CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX);
return strncmp(application_name,
CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX,
prefixLength) == 0;
}
/*
* DetermineCitusBackendType determines the type of backend based on the application_name.
*/

View File

@ -35,8 +35,22 @@
#include "distributed/worker_create_or_replace.h"
#include "distributed/worker_protocol.h"
/*
* OnCollisionAction describes what to do when the created object
* and existing object do not match.
*/
typedef enum OnCollisionAction
{
ON_COLLISION_RENAME,
ON_COLLISION_DROP
} OnCollisionAction;
static List * CreateStmtListByObjectAddress(const ObjectAddress *address);
static bool CompareStringList(List *list1, List *list2);
static OnCollisionAction GetOnCollisionAction(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(worker_create_or_replace_object);
PG_FUNCTION_INFO_V1(worker_create_or_replace_object_array);
@ -192,7 +206,8 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
/*
* Object with name from statement is already found locally, check if states are
* identical. If objects differ we will rename the old object (non- destructively)
* as to make room to create the new object according to the spec sent.
* or drop it (if safe) as to make room to create the new object according to the
* spec sent.
*/
/*
@ -213,11 +228,22 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
return false;
}
char *newName = GenerateBackupNameForCollision(address);
Node *utilityStmt = NULL;
RenameStmt *renameStmt = CreateRenameStatement(address, newName);
const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt,
if (GetOnCollisionAction(address) == ON_COLLISION_DROP)
{
/* drop the existing object */
utilityStmt = (Node *) CreateDropStmt(address);
}
else
{
/* rename the existing object */
char *newName = GenerateBackupNameForCollision(address);
utilityStmt = (Node *) CreateRenameStatement(address, newName);
}
const char *commandString = DeparseTreeNode(utilityStmt);
ProcessUtilityParseTree(utilityStmt, commandString,
PROCESS_UTILITY_QUERY,
NULL, None_Receiver, NULL);
}
@ -286,6 +312,11 @@ CreateStmtListByObjectAddress(const ObjectAddress *address)
return list_make1(GetFunctionDDLCommand(address->objectId, false));
}
case OCLASS_PUBLICATION:
{
return list_make1(CreatePublicationDDLCommand(address->objectId));
}
case OCLASS_TSCONFIG:
{
List *stmts = GetCreateTextSearchConfigStatements(address);
@ -312,6 +343,37 @@ CreateStmtListByObjectAddress(const ObjectAddress *address)
}
/*
* GetOnCollisionAction decides what to do when the object already exists.
*/
static OnCollisionAction
GetOnCollisionAction(const ObjectAddress *address)
{
switch (getObjectClass(address))
{
case OCLASS_PUBLICATION:
{
/*
* We prefer to drop publications because they can be
* harmful (cause update/delete failures) and are relatively
* safe to drop.
*/
return ON_COLLISION_DROP;
}
case OCLASS_COLLATION:
case OCLASS_PROC:
case OCLASS_TSCONFIG:
case OCLASS_TSDICT:
case OCLASS_TYPE:
default:
{
return ON_COLLISION_RENAME;
}
}
}
/*
* GenerateBackupNameForCollision calculate a backup name for a given object by its
* address. This name should be used when renaming an existing object before creating the
@ -362,6 +424,64 @@ GenerateBackupNameForCollision(const ObjectAddress *address)
}
/*
* CreateDropPublicationStmt creates a DROP PUBLICATION statement for the
* publication at the given address.
*/
static DropStmt *
CreateDropPublicationStmt(const ObjectAddress *address)
{
Assert(address->classId == PublicationRelationId);
DropStmt *dropStmt = makeNode(DropStmt);
dropStmt->removeType = OBJECT_PUBLICATION;
dropStmt->behavior = DROP_RESTRICT;
HeapTuple publicationTuple =
SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(address->objectId));
if (!HeapTupleIsValid(publicationTuple))
{
ereport(ERROR, (errmsg("cannot find publication with oid: %d",
address->objectId)));
}
Form_pg_publication publicationForm =
(Form_pg_publication) GETSTRUCT(publicationTuple);
char *publicationName = NameStr(publicationForm->pubname);
dropStmt->objects = list_make1(makeString(publicationName));
ReleaseSysCache(publicationTuple);
return dropStmt;
}
/*
* CreateDropStmt returns a DROP statement for the given object.
*/
DropStmt *
CreateDropStmt(const ObjectAddress *address)
{
switch (getObjectClass(address))
{
case OCLASS_PUBLICATION:
{
return CreateDropPublicationStmt(address);
}
default:
{
break;
}
}
ereport(ERROR, (errmsg("unsupported object to construct a drop statement"),
errdetail("unable to generate a parsetree for the drop")));
}
/*
* CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress.
* The rename statement will rename the existing object on its address to the value

View File

@ -351,18 +351,17 @@ ShouldHideShardsInternal(void)
return false;
}
}
else if (MyBackendType != B_BACKEND)
else if (MyBackendType != B_BACKEND && MyBackendType != B_WAL_SENDER)
{
/*
* We are aiming only to hide shards from client
* backends or certain background workers(see above),
* not backends like walsender or checkpointer.
*/
return false;
}
if (IsCitusInternalBackend() || IsRebalancerInternalBackend() ||
IsCitusRunCommandBackend())
IsCitusRunCommandBackend() || IsCitusShardTransferBackend())
{
/* we never hide shards from Citus */
return false;

View File

@ -77,6 +77,7 @@ extern bool IsCitusInternalBackend(void);
extern bool IsRebalancerInternalBackend(void);
extern bool IsCitusRunCommandBackend(void);
extern bool IsExternalClientBackend(void);
extern bool IsCitusShardTransferBackend(void);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999

View File

@ -409,6 +409,24 @@ extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, u
extern void AddRangeTableEntryToQueryCompat(ParseState *parseState, Relation relation);
/* publication.c - forward declarations */
extern List * PostProcessCreatePublicationStmt(Node *node, const char *queryString);
extern List * CreatePublicationDDLCommandsIdempotent(const ObjectAddress *address);
extern char * CreatePublicationDDLCommand(Oid publicationId);
extern List * PreprocessAlterPublicationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityCtx);
extern List * GetAlterPublicationDDLCommandsForTable(Oid relationId, bool isAdd);
extern char * GetAlterPublicationTableDDLCommand(Oid publicationId, Oid relationId,
bool isAdd);
extern List * AlterPublicationOwnerStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
extern List * AlterPublicationStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
extern List * CreatePublicationStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
extern List * RenamePublicationStmtObjectAddress(Node *node, bool missingOk,
bool isPostProcess);
/* rename.c - forward declarations*/
extern List * PreprocessRenameStmt(Node *renameStmt, const char *renameCommand,
ProcessUtilityContext processUtilityContext);
@ -657,7 +675,6 @@ extern List * PreprocessDropViewStmt(Node *node, const char *queryString,
extern List * DropViewStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
extern char * CreateViewDDLCommand(Oid viewOid);
extern List * GetViewCreationCommandsOfTable(Oid relationId);
extern List * GetViewCreationTableDDLCommandsOfTable(Oid relationId);
extern char * AlterViewOwnerCommand(Oid viewOid);
extern char * DeparseViewStmt(Node *node);
extern char * DeparseDropViewStmt(Node *node);

View File

@ -42,6 +42,14 @@
/* application name used for connections made by run_command_on_* */
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
/*
* application name prefix for move/split replication connections.
*
* This application_name is set to the subscription name by logical replication
* workers, so there is no GPID.
*/
#define CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX "citus_shard_"
/* deal with waiteventset errors */
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
#define WAIT_EVENT_SET_INDEX_FAILED -2

View File

@ -210,6 +210,23 @@ extern char * DeparseAlterExtensionStmt(Node *stmt);
/* forward declarations for deparse_database_stmts.c */
extern char * DeparseAlterDatabaseOwnerStmt(Node *node);
/* forward declaration for deparse_publication_stmts.c */
extern char * DeparseCreatePublicationStmt(Node *stmt);
extern char * DeparseCreatePublicationStmtExtended(Node *node,
bool whereClauseNeedsTransform,
bool includeLocalTables);
extern char * DeparseAlterPublicationStmt(Node *stmt);
extern char * DeparseAlterPublicationStmtExtended(Node *stmt,
bool whereClauseNeedsTransform,
bool includeLocalTables);
extern char * DeparseAlterPublicationOwnerStmt(Node *stmt);
extern char * DeparseAlterPublicationSchemaStmt(Node *node);
extern char * DeparseDropPublicationStmt(Node *stmt);
extern char * DeparseRenamePublicationStmt(Node *node);
extern void QualifyCreatePublicationStmt(Node *node);
extern void QualifyAlterPublicationStmt(Node *node);
/* forward declatations for deparse_text_search_stmts.c */
extern void QualifyAlterTextSearchConfigurationOwnerStmt(Node *node);
extern void QualifyAlterTextSearchConfigurationSchemaStmt(Node *node);

View File

@ -147,6 +147,7 @@ extern char * GetTableTypeName(Oid tableId);
extern void SetCreateCitusTransactionLevel(int val);
extern int GetCitusCreationLevel(void);
extern bool IsCitusTable(Oid relationId);
extern bool IsCitusTableRangeVar(RangeVar *rangeVar, LOCKMODE lockMode, bool missingOk);
extern bool IsCitusTableViaCatalog(Oid relationId);
extern char PgDistPartitionViaCatalog(Oid relationId);
extern List * LookupDistShardTuples(Oid relationId);

View File

@ -21,6 +21,7 @@
extern char * WrapCreateOrReplace(const char *sql);
extern char * WrapCreateOrReplaceList(List *sqls);
extern char * GenerateBackupNameForCollision(const ObjectAddress *address);
extern DropStmt * CreateDropStmt(const ObjectAddress *address);
extern RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName);
#endif /* WORKER_CREATE_OR_REPLACE_H */

View File

@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -41,12 +41,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,7 +46,7 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
#insert data into the sensors table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
@ -56,7 +56,6 @@ FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -26,7 +26,7 @@ wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
# Create the reference table in the coordinator and cdc client nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_reference_table('reference_table');");
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -56,7 +56,7 @@ FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
create_cdc_publication_and_slots_for_workers(\@workers,'sensors');
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -43,12 +43,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -48,12 +48,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);

View File

@ -195,7 +195,7 @@ sub create_cdc_publication_and_replication_slots_for_citus_cluster {
my $table_names = $_[2];
create_cdc_publication_and_slots_for_coordinator($node_coordinator, $table_names);
create_cdc_publication_and_slots_for_workers($workersref, $table_names);
create_cdc_slots_for_workers($workersref);
}
sub create_cdc_publication_and_slots_for_coordinator {
@ -210,31 +210,7 @@ sub create_cdc_publication_and_slots_for_coordinator {
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)");
}
sub create_cdc_publication_and_slots_for_workers {
my $workersref = $_[0];
my $table_names = $_[1];
create_cdc_publication_for_workers($workersref, $table_names);
create_cdc_replication_slots_for_workers($workersref);
}
sub create_cdc_publication_for_workers {
my $workersref = $_[0];
my $table_names = $_[1];
for (@$workersref) {
my $pub = $_->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';");
if ($pub ne "") {
$_->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;");
}
if ($table_names eq "all") {
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR ALL TABLES;");
} else {
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;");
}
}
}
sub create_cdc_replication_slots_for_workers {
sub create_cdc_slots_for_workers {
my $workersref = $_[0];
for (@$workersref) {
my $slot = $_->safe_psql('postgres',"select * from pg_replication_slots where slot_name = 'cdc_replication_slot';");

View File

@ -19,6 +19,7 @@ test: citus_local_tables_ent
test: remove_coordinator
# --------
test: publication
test: logical_replication
test: multi_create_table
test: multi_create_table_superuser

View File

@ -25,7 +25,9 @@ NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipp
-- This allows us to test the cleanup logic at the start of the shard move.
\c - - - :worker_1_port
SET search_path TO logical_replication;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_6830000;
RESET citus.enable_ddl_propagation;
\c - - - :master_port
SET search_path TO logical_replication;
CREATE TABLE dist_6830000(
@ -155,6 +157,13 @@ SELECT count(*) from dist;
100
(1 row)
DROP PUBLICATION citus_shard_move_publication_:postgres_oid;
SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
pg_drop_replication_slot
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
@ -188,3 +197,9 @@ ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE);
DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid;
DROP SCHEMA logical_replication CASCADE;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)

View File

@ -713,13 +713,16 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
CREATE TABLE publication_test_table(id int);
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
CREATE OPERATOR === (
LEFTARG = int,
RIGHTARG = int,
FUNCTION = int4eq
);
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false))
AS (VALUES ('operator', ARRAY['===']::text[], ARRAY['int','int']::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: publication object can not be distributed by Citus
ERROR: operator object can not be distributed by Citus
ROLLBACK;
-- Show that citus_internal_add_object_metadata checks the priviliges
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;

View File

@ -22,7 +22,7 @@ SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND
SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port;
worker_2_id
---------------------------------------------------------------------
18
35
(1 row)
\gset

View File

@ -444,7 +444,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
(4 rows)
-- or, we set it to walsender
-- the shards and indexes do show up
-- the shards and indexes do not show up
SELECT set_backend_type(9);
NOTICE: backend type switched to: walsender
set_backend_type
@ -452,6 +452,17 @@ NOTICE: backend type switched to: walsender
(1 row)
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname
---------------------------------------------------------------------
test_index
test_table
test_table_102008
test_table_2_1130000
(4 rows)
-- unless the application name starts with citus_shard
SET application_name = 'citus_shard_move';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname
---------------------------------------------------------------------
@ -467,6 +478,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
test_table_2_1130000
(10 rows)
RESET application_name;
-- but, client backends to see the shards
SELECT set_backend_type(3);
NOTICE: backend type switched to: client backend

View File

@ -16,7 +16,7 @@ SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND
SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port;
worker_2_id
---------------------------------------------------------------------
18
35
(1 row)
\gset

View File

@ -0,0 +1,379 @@
CREATE SCHEMA publication;
CREATE SCHEMA "publication-1";
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently
SELECT citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_2_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
-- create some publications with conflicting names on worker node
-- publication will be different from coordinator
CREATE PUBLICATION "pub-all";
-- publication will be same as coordinator
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');;
\c - - - :master_port
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- do not create publications on worker 2 initially
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- create a non-distributed publication
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete');
RESET citus.enable_ddl_propagation;
ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate');
-- create regular, distributed publications
CREATE PUBLICATION pubempty;
CREATE PUBLICATION pubinsertonly WITH (publish = 'insert');
CREATE PUBLICATION "pub-all" FOR ALL TABLES;
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');
-- add worker 2 with publications
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Check publications on all the nodes, if we see the same publication name twice then its definition differs
-- Note that publications are special in the sense that the coordinator object might differ from
-- worker objects due to the presence of regular tables.
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubempty WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubinsertonly WITH (publish_via_partition_root = ''false'', publish = ''insert'')');
(4 rows)
CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml);
CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int);
CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int);
-- various operations on a publication with only local tables
CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate');
ALTER PUBLICATION pubtables_orig DROP TABLE test;
ALTER PUBLICATION pubtables_orig ADD TABLE test;
-- publication will be empty on worker nodes, since all tables are local
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables_orig WITH (publish_via_partition_root = ''false'', publish = ''insert, truncate'')');
(1 row)
-- distribute a table, creating a mixed publication
SELECT create_distributed_table('test','x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- some generic operations
ALTER PUBLICATION pubtables_orig RENAME TO pubtables;
ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete');
ALTER PUBLICATION pubtables OWNER TO postgres;
ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete');
ERROR: unrecognized value for publication option "publish": "inert"
ALTER PUBLICATION pubtables ADD TABLE notexist;
ERROR: relation "notexist" does not exist
-- operations with a distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test;
ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs";
-- operations with a local table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
SELECT create_distributed_table('"test-pubs"', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- test and test-pubs will show up in worker nodes
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, TABLE publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete'')');
(1 row)
-- operations with a strangely named distributed table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
-- create a publication with distributed and local tables
DROP PUBLICATION pubtables;
CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs";
-- change distributed tables
SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true);
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('test');
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table_concurrently('test', 'x');
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('"test-pubs"');
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('"test-pubs"');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- publications are unchanged despite various tranformations
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, TABLE publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- partitioned table
CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a);
CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10);
ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey;
CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20);
ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
SELECT create_distributed_table('testpub_partitioned', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')');
(1 row)
DROP PUBLICATION pubpartitioned;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
-- add a partition
ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
ERROR: malformed array literal: ""
DETAIL: Array value must start with "{" or dimension information.
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
DROP PUBLICATION pubempty;
DROP PUBLICATION pubtables;
DROP PUBLICATION pubinsertonly;
DROP PUBLICATION "pub-all-insertupdateonly";
DROP PUBLICATION "pub-all";
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION pubnotdistributed;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
\q
\endif
-- recreate a mixed publication
CREATE PUBLICATION pubtables FOR TABLE test, "publication-1"."test-pubs";
-- operations on an existing distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test (y);
ALTER PUBLICATION pubtables SET TABLE test WHERE (doc IS DOCUMENT);
ALTER PUBLICATION pubtables SET TABLE test WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE doc));
ALTER PUBLICATION pubtables SET TABLE test WHERE (CASE x WHEN 5 THEN true ELSE false END);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test WHERE (CASE test.x WHEN 5 THEN true ELSE false END) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
ALTER PUBLICATION pubtables SET TABLE test ("column-1", x) WHERE (x > "column-1"), "publication-1"."test-pubs";
-- operations on a local table
ALTER PUBLICATION pubtables DROP TABLE "publication-1"."test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "publication-1"."test-pubs" (y);
-- mixed operations
ALTER PUBLICATION pubtables SET TABLE test, TABLES IN SCHEMA "publication-1", TABLES IN SCHEMA current_schema;
ALTER PUBLICATION pubtables SET TABLE "publication-1"."test-pubs", test ("column-1", x) WHERE (x > "column-1");
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test (x, "column-1") WHERE ((test.x > test."column-1")) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- publication with schemas
CREATE PUBLICATION "pub-mix" FOR TABLE test, TABLES IN SCHEMA current_schema, TABLE "publication-1"."test-pubs", TABLES IN SCHEMA "publication-1";
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pub-mix%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-mix" FOR TABLES IN SCHEMA publication, TABLES IN SCHEMA "publication-1", TABLE publication.test WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- publication on a partitioned table
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned (a, b) WITH (publish_via_partition_root = 'true');
ALTER PUBLICATION pubpartitioned SET (publish_via_partition_root = 1);
SELECT alter_distributed_table('testpub_partitioned', shard_count := 6, cascade_to_colocated := true);
NOTICE: converting the partitions of publication.testpub_partitioned
NOTICE: creating a new table for publication.testpub_partitioned_0
NOTICE: moving the data of publication.testpub_partitioned_0
NOTICE: dropping the old publication.testpub_partitioned_0
NOTICE: renaming the new table to publication.testpub_partitioned_0
NOTICE: creating a new table for publication.testpub_partitioned_1
NOTICE: moving the data of publication.testpub_partitioned_1
NOTICE: dropping the old publication.testpub_partitioned_1
NOTICE: renaming the new table to publication.testpub_partitioned_1
NOTICE: creating a new table for publication.testpub_partitioned
NOTICE: dropping the old publication.testpub_partitioned
NOTICE: renaming the new table to publication.testpub_partitioned
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned (a, b) WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- make sure we propagate schema dependencies
SET citus.create_object_propagation TO 'deferred';
BEGIN;
CREATE SCHEMA deptest;
END;
CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest;
RESET citus.create_object_propagation;
DROP SCHEMA deptest CASCADE;
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
DROP PUBLICATION pubdep;
DROP PUBLICATION "pub-mix";
DROP PUBLICATION pubtables;
DROP PUBLICATION pubpartitioned;
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -0,0 +1,273 @@
CREATE SCHEMA publication;
CREATE SCHEMA "publication-1";
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently
SELECT citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_2_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
-- create some publications with conflicting names on worker node
-- publication will be different from coordinator
CREATE PUBLICATION "pub-all";
-- publication will be same as coordinator
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');;
\c - - - :master_port
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- do not create publications on worker 2 initially
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- create a non-distributed publication
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete');
RESET citus.enable_ddl_propagation;
ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate');
-- create regular, distributed publications
CREATE PUBLICATION pubempty;
CREATE PUBLICATION pubinsertonly WITH (publish = 'insert');
CREATE PUBLICATION "pub-all" FOR ALL TABLES;
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');
-- add worker 2 with publications
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Check publications on all the nodes, if we see the same publication name twice then its definition differs
-- Note that publications are special in the sense that the coordinator object might differ from
-- worker objects due to the presence of regular tables.
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubempty WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubinsertonly WITH (publish_via_partition_root = ''false'', publish = ''insert'')');
(4 rows)
CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml);
CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int);
CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int);
-- various operations on a publication with only local tables
CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate');
ALTER PUBLICATION pubtables_orig DROP TABLE test;
ALTER PUBLICATION pubtables_orig ADD TABLE test;
-- publication will be empty on worker nodes, since all tables are local
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables_orig WITH (publish_via_partition_root = ''false'', publish = ''insert, truncate'')');
(1 row)
-- distribute a table, creating a mixed publication
SELECT create_distributed_table('test','x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- some generic operations
ALTER PUBLICATION pubtables_orig RENAME TO pubtables;
ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete');
ALTER PUBLICATION pubtables OWNER TO postgres;
ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete');
ERROR: unrecognized "publish" value: "inert"
ALTER PUBLICATION pubtables ADD TABLE notexist;
ERROR: relation "notexist" does not exist
-- operations with a distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test;
ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs";
-- operations with a local table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
SELECT create_distributed_table('"test-pubs"', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- test and test-pubs will show up in worker nodes
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete'')');
(1 row)
-- operations with a strangely named distributed table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
-- create a publication with distributed and local tables
DROP PUBLICATION pubtables;
CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs";
-- change distributed tables
SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true);
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('test');
NOTICE: creating a new table for publication.test
NOTICE: moving the data of publication.test
NOTICE: dropping the old publication.test
NOTICE: renaming the new table to publication.test
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table_concurrently('test', 'x');
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('"test-pubs"');
NOTICE: creating a new table for publication."test-pubs"
NOTICE: moving the data of publication."test-pubs"
NOTICE: dropping the old publication."test-pubs"
NOTICE: renaming the new table to publication."test-pubs"
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('"test-pubs"');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- publications are unchanged despite various tranformations
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- partitioned table
CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a);
CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10);
ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey;
CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20);
ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
SELECT create_distributed_table('testpub_partitioned', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')');
(1 row)
DROP PUBLICATION pubpartitioned;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
-- add a partition
ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
ERROR: malformed array literal: ""
DETAIL: Array value must start with "{" or dimension information.
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
DROP PUBLICATION pubempty;
DROP PUBLICATION pubtables;
DROP PUBLICATION pubinsertonly;
DROP PUBLICATION "pub-all-insertupdateonly";
DROP PUBLICATION "pub-all";
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION pubnotdistributed;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
\q

View File

@ -61,7 +61,9 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
@ -261,7 +263,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
@ -428,7 +432,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
@ -597,8 +603,10 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,

View File

@ -21,7 +21,9 @@ SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0);
-- This allows us to test the cleanup logic at the start of the shard move.
\c - - - :worker_1_port
SET search_path TO logical_replication;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_6830000;
RESET citus.enable_ddl_propagation;
\c - - - :master_port
SET search_path TO logical_replication;
@ -72,6 +74,9 @@ SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT count(*) from dist;
DROP PUBLICATION citus_shard_move_publication_:postgres_oid;
SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
\c - - - :worker_2_port
SET search_path TO logical_replication;
@ -88,3 +93,4 @@ ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE);
DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid;
DROP SCHEMA logical_replication CASCADE;
SELECT public.wait_for_resource_cleanup();

View File

@ -429,12 +429,15 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
CREATE TABLE publication_test_table(id int);
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
CREATE OPERATOR === (
LEFTARG = int,
RIGHTARG = int,
FUNCTION = int4eq
);
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false))
AS (VALUES ('operator', ARRAY['===']::text[], ARRAY['int','int']::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;

View File

@ -232,10 +232,15 @@ SELECT set_backend_type(4);
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
-- or, we set it to walsender
-- the shards and indexes do show up
-- the shards and indexes do not show up
SELECT set_backend_type(9);
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
-- unless the application name starts with citus_shard
SET application_name = 'citus_shard_move';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
RESET application_name;
-- but, client backends to see the shards
SELECT set_backend_type(3);
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;

View File

@ -0,0 +1,269 @@
CREATE SCHEMA publication;
CREATE SCHEMA "publication-1";
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently
SELECT citus_set_coordinator_host('localhost', :master_port);
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
\c - - - :worker_2_port
SET citus.enable_ddl_propagation TO off;
CREATE OR REPLACE FUNCTION activate_node_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to activate node snapshot';
-- create some publications with conflicting names on worker node
-- publication will be different from coordinator
CREATE PUBLICATION "pub-all";
-- publication will be same as coordinator
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');;
\c - - - :master_port
SET search_path TO publication;
SET citus.shard_replication_factor TO 1;
-- do not create publications on worker 2 initially
SELECT citus_remove_node('localhost', :worker_2_port);
-- create a non-distributed publication
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete');
RESET citus.enable_ddl_propagation;
ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate');
-- create regular, distributed publications
CREATE PUBLICATION pubempty;
CREATE PUBLICATION pubinsertonly WITH (publish = 'insert');
CREATE PUBLICATION "pub-all" FOR ALL TABLES;
CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');
-- add worker 2 with publications
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- Check publications on all the nodes, if we see the same publication name twice then its definition differs
-- Note that publications are special in the sense that the coordinator object might differ from
-- worker objects due to the presence of regular tables.
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$)
ORDER BY c) s;
CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml);
CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int);
CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int);
-- various operations on a publication with only local tables
CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate');
ALTER PUBLICATION pubtables_orig DROP TABLE test;
ALTER PUBLICATION pubtables_orig ADD TABLE test;
-- publication will be empty on worker nodes, since all tables are local
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- distribute a table, creating a mixed publication
SELECT create_distributed_table('test','x', colocate_with := 'none');
-- some generic operations
ALTER PUBLICATION pubtables_orig RENAME TO pubtables;
ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete');
ALTER PUBLICATION pubtables OWNER TO postgres;
ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete');
ALTER PUBLICATION pubtables ADD TABLE notexist;
-- operations with a distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test;
ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs";
-- operations with a local table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
SELECT create_distributed_table('"test-pubs"', 'x');
-- test and test-pubs will show up in worker nodes
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- operations with a strangely named distributed table in a mixed publication
ALTER PUBLICATION pubtables DROP TABLE "test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "test-pubs";
-- create a publication with distributed and local tables
DROP PUBLICATION pubtables;
CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs";
-- change distributed tables
SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true);
SELECT undistribute_table('test');
SELECT citus_add_local_table_to_metadata('test');
SELECT create_distributed_table_concurrently('test', 'x');
SELECT undistribute_table('"test-pubs"');
SELECT create_reference_table('"test-pubs"');
-- publications are unchanged despite various tranformations
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- partitioned table
CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a);
CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10);
ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey;
CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20);
ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a);
ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
SELECT create_distributed_table('testpub_partitioned', 'a');
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
DROP PUBLICATION pubpartitioned;
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true');
-- add a partition
ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
DROP PUBLICATION pubempty;
DROP PUBLICATION pubtables;
DROP PUBLICATION pubinsertonly;
DROP PUBLICATION "pub-all-insertupdateonly";
DROP PUBLICATION "pub-all";
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION pubnotdistributed;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);
\q
\endif
-- recreate a mixed publication
CREATE PUBLICATION pubtables FOR TABLE test, "publication-1"."test-pubs";
-- operations on an existing distributed table
ALTER PUBLICATION pubtables DROP TABLE test;
ALTER PUBLICATION pubtables ADD TABLE test (y);
ALTER PUBLICATION pubtables SET TABLE test WHERE (doc IS DOCUMENT);
ALTER PUBLICATION pubtables SET TABLE test WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE doc));
ALTER PUBLICATION pubtables SET TABLE test WHERE (CASE x WHEN 5 THEN true ELSE false END);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
ALTER PUBLICATION pubtables SET TABLE test ("column-1", x) WHERE (x > "column-1"), "publication-1"."test-pubs";
-- operations on a local table
ALTER PUBLICATION pubtables DROP TABLE "publication-1"."test-pubs";
ALTER PUBLICATION pubtables ADD TABLE "publication-1"."test-pubs" (y);
-- mixed operations
ALTER PUBLICATION pubtables SET TABLE test, TABLES IN SCHEMA "publication-1", TABLES IN SCHEMA current_schema;
ALTER PUBLICATION pubtables SET TABLE "publication-1"."test-pubs", test ("column-1", x) WHERE (x > "column-1");
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$)
ORDER BY c) s;
-- publication with schemas
CREATE PUBLICATION "pub-mix" FOR TABLE test, TABLES IN SCHEMA current_schema, TABLE "publication-1"."test-pubs", TABLES IN SCHEMA "publication-1";
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pub-mix%' ORDER BY 1) s$$)
ORDER BY c) s;
-- publication on a partitioned table
CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned (a, b) WITH (publish_via_partition_root = 'true');
ALTER PUBLICATION pubpartitioned SET (publish_via_partition_root = 1);
SELECT alter_distributed_table('testpub_partitioned', shard_count := 6, cascade_to_colocated := true);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$)
ORDER BY c) s;
-- make sure we propagate schema dependencies
SET citus.create_object_propagation TO 'deferred';
BEGIN;
CREATE SCHEMA deptest;
END;
CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest;
RESET citus.create_object_propagation;
DROP SCHEMA deptest CASCADE;
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
DROP PUBLICATION pubdep;
DROP PUBLICATION "pub-mix";
DROP PUBLICATION pubtables;
DROP PUBLICATION pubpartitioned;
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
SELECT citus_remove_node('localhost', :master_port);

View File

@ -64,7 +64,9 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
@ -176,7 +178,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
@ -282,7 +286,9 @@ SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
@ -401,8 +407,10 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
RESET citus.enable_ddl_propagation;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,