From b09d239809bf7b3a8b83e469720505706d3ad961 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 17 Mar 2023 22:15:01 +0100 Subject: [PATCH] Propagate CREATE PUBLICATION statements --- .../distributed/commands/alter_table.c | 27 +- .../citus_add_local_table_to_metadata.c | 31 + .../distributed/commands/dependencies.c | 5 + .../commands/distribute_object_ops.c | 73 ++ .../distributed/commands/publication.c | 634 ++++++++++++++++ .../deparser/deparse_publication_stmts.c | 690 ++++++++++++++++++ .../deparser/qualify_publication_stmt.c | 119 +++ .../distributed/executor/multi_executor.c | 5 + src/backend/distributed/metadata/dependency.c | 63 ++ .../distributed/metadata/metadata_cache.c | 12 + .../distributed/metadata/metadata_sync.c | 48 +- .../metadata/pg_get_object_address_13_14_15.c | 1 + .../replication/multi_logical_replication.c | 2 + .../distributed/sql/citus--11.2-1--11.3-1.sql | 7 +- .../sql/downgrades/citus--11.3-1--11.2-1.sql | 8 + .../worker/worker_create_or_replace.c | 130 +++- src/include/distributed/commands.h | 19 +- src/include/distributed/deparser.h | 17 + src/include/distributed/metadata_cache.h | 1 + .../distributed/worker_create_or_replace.h | 1 + src/test/regress/enterprise_schedule | 1 + .../regress/expected/logical_replication.out | 15 + .../expected/metadata_sync_helpers.out | 11 +- .../regress/expected/multi_multiuser_auth.out | 2 +- .../regress/expected/multi_poolinfo_usage.out | 2 +- src/test/regress/expected/publication.out | 379 ++++++++++ src/test/regress/expected/publication_0.out | 273 +++++++ src/test/regress/expected/split_shard.out | 8 + src/test/regress/sql/logical_replication.sql | 6 + .../regress/sql/metadata_sync_helpers.sql | 9 +- src/test/regress/sql/publication.sql | 269 +++++++ src/test/regress/sql/split_shard.sql | 8 + 32 files changed, 2850 insertions(+), 26 deletions(-) create mode 100644 src/backend/distributed/commands/publication.c create mode 100644 src/backend/distributed/deparser/deparse_publication_stmts.c create mode 100644 src/backend/distributed/deparser/qualify_publication_stmt.c create mode 100644 src/test/regress/expected/publication.out create mode 100644 src/test/regress/expected/publication_0.out create mode 100644 src/test/regress/sql/publication.sql diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index c0deadd1e..4c1ef8783 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -217,6 +217,7 @@ static bool WillRecreateForeignKeyToReferenceTable(Oid relationId, CascadeToColocatedOption cascadeOption); static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId); static void ErrorIfUnsupportedCascadeObjects(Oid relationId); +static List * WrapTableDDLCommands(List *commandStrings); static bool DoesCascadeDropUnsupportedObject(Oid classId, Oid id, HTAB *nodeMap); static TableConversionReturn * CopyTableConversionReturnIntoCurrentContext( TableConversionReturn *tableConversionReturn); @@ -604,9 +605,18 @@ ConvertTableInternal(TableConversionState *con) List *justBeforeDropCommands = NIL; List *attachPartitionCommands = NIL; - postLoadCommands = - list_concat(postLoadCommands, - GetViewCreationTableDDLCommandsOfTable(con->relationId)); + List *createViewCommands = GetViewCreationCommandsOfTable(con->relationId); + + postLoadCommands = list_concat(postLoadCommands, + WrapTableDDLCommands(createViewCommands)); + + /* need to add back to publications after dropping the original table */ + bool isAdd = true; + List *alterPublicationCommands = + GetAlterPublicationDDLCommandsForTable(con->relationId, isAdd); + + postLoadCommands = list_concat(postLoadCommands, + WrapTableDDLCommands(alterPublicationCommands)); List *foreignKeyCommands = NIL; if (con->conversionType == ALTER_DISTRIBUTED_TABLE) @@ -1493,17 +1503,16 @@ GetViewCreationCommandsOfTable(Oid relationId) /* - * GetViewCreationTableDDLCommandsOfTable is the same as GetViewCreationCommandsOfTable, - * but the returned list includes objects of TableDDLCommand's, not strings. + * WrapTableDDLCommands takes a list of command strings and wraps them + * in TableDDLCommand structs. */ -List * -GetViewCreationTableDDLCommandsOfTable(Oid relationId) +static List * +WrapTableDDLCommands(List *commandStrings) { - List *commands = GetViewCreationCommandsOfTable(relationId); List *tableDDLCommands = NIL; char *command = NULL; - foreach_ptr(command, commands) + foreach_ptr(command, commandStrings) { tableDDLCommands = lappend(tableDDLCommands, makeTableDDLCommandString(command)); } diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index bb4ab7473..a0fc7f6e5 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -85,6 +85,7 @@ static void DropRelationTruncateTriggers(Oid relationId); static char * GetDropTriggerCommand(Oid relationId, char *triggerName); static void DropViewsOnTable(Oid relationId); static void DropIdentitiesOnTable(Oid relationId); +static void DropTableFromPublications(Oid relationId); static List * GetRenameStatsCommandList(List *statsOidList, uint64 shardId); static List * ReversedOidList(List *oidList); static void AppendExplicitIndexIdsToList(Form_pg_index indexForm, @@ -338,6 +339,10 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve List *shellTableDDLEvents = GetShellTableDDLEventsForCitusLocalTable(relationId); List *tableViewCreationCommands = GetViewCreationCommandsOfTable(relationId); + bool isAdd = true; + List *alterPublicationCommands = + GetAlterPublicationDDLCommandsForTable(relationId, isAdd); + char *relationName = get_rel_name(relationId); Oid relationSchemaId = get_rel_namespace(relationId); @@ -347,6 +352,12 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve */ DropIdentitiesOnTable(relationId); + /* + * We do not want the shard to be in the publication (subscribers are + * unlikely to recognize it). + */ + DropTableFromPublications(relationId); + /* below we convert relation with relationId to the shard relation */ uint64 shardId = ConvertLocalTableToShard(relationId); @@ -363,6 +374,11 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve */ ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(tableViewCreationCommands); + /* + * Execute the publication creation commands with the shell table. + */ + ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(alterPublicationCommands); + /* * Set shellRelationId as the relation with relationId now points * to the shard relation. @@ -1163,6 +1179,21 @@ DropIdentitiesOnTable(Oid relationId) } +/* + * DropTableFromPublications drops the table from all of its publications. + */ +static void +DropTableFromPublications(Oid relationId) +{ + bool isAdd = false; + + List *alterPublicationCommands = + GetAlterPublicationDDLCommandsForTable(relationId, isAdd); + + ExecuteAndLogUtilityCommandList(alterPublicationCommands); +} + + /* * DropViewsOnTable drops the views that depend on the given relation. */ diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 735449973..cf7c105cb 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -438,6 +438,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) return DDLCommands; } + case OCLASS_PUBLICATION: + { + return CreatePublicationDDLCommandsIdempotent(dependency); + } + case OCLASS_ROLE: { return GenerateCreateOrAlterRoleCommand(dependency->objectId); diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 3f30eaaa2..017cb6537 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -245,6 +245,15 @@ static DistributeObjectOps Any_CreatePolicy = { .address = NULL, .markDistributed = false, }; +static DistributeObjectOps Any_CreatePublication = { + .deparse = DeparseCreatePublicationStmt, + .qualify = QualifyCreatePublicationStmt, + .preprocess = NULL, + .postprocess = PostProcessCreatePublicationStmt, + .operationType = DIST_OPS_CREATE, + .address = CreatePublicationStmtObjectAddress, + .markDistributed = true, +}; static DistributeObjectOps Any_CreateRole = { .deparse = DeparseCreateRoleStmt, .qualify = NULL, @@ -707,6 +716,45 @@ static DistributeObjectOps Procedure_Rename = { .address = RenameFunctionStmtObjectAddress, .markDistributed = false, }; +static DistributeObjectOps Publication_Alter = { + .deparse = DeparseAlterPublicationStmt, + .qualify = QualifyAlterPublicationStmt, + .preprocess = PreprocessAlterPublicationStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_PUBLICATION, + .operationType = DIST_OPS_ALTER, + .address = AlterPublicationStmtObjectAddress, + .markDistributed = false, +}; +static DistributeObjectOps Publication_AlterOwner = { + .deparse = DeparseAlterPublicationOwnerStmt, + .qualify = NULL, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_PUBLICATION, + .operationType = DIST_OPS_ALTER, + .address = AlterPublicationOwnerStmtObjectAddress, + .markDistributed = false, +}; +static DistributeObjectOps Publication_Drop = { + .deparse = DeparseDropPublicationStmt, + .qualify = NULL, + .preprocess = PreprocessDropDistributedObjectStmt, + .postprocess = NULL, + .operationType = DIST_OPS_DROP, + .address = NULL, + .markDistributed = false, +}; +static DistributeObjectOps Publication_Rename = { + .deparse = DeparseRenamePublicationStmt, + .qualify = NULL, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = NULL, + .objectType = OBJECT_PUBLICATION, + .operationType = DIST_OPS_ALTER, + .address = RenamePublicationStmtObjectAddress, + .markDistributed = false, +}; static DistributeObjectOps Routine_AlterObjectDepends = { .deparse = DeparseAlterFunctionDependsStmt, .qualify = QualifyAlterFunctionDependsStmt, @@ -1399,6 +1447,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_AlterOwner; } + case OBJECT_PUBLICATION: + { + return &Publication_AlterOwner; + } + case OBJECT_ROUTINE: { return &Routine_AlterOwner; @@ -1436,6 +1489,11 @@ GetDistributeObjectOps(Node *node) return &Any_AlterPolicy; } + case T_AlterPublicationStmt: + { + return &Publication_Alter; + } + case T_AlterRoleStmt: { return &Any_AlterRole; @@ -1610,6 +1668,11 @@ GetDistributeObjectOps(Node *node) return &Any_CreatePolicy; } + case T_CreatePublicationStmt: + { + return &Any_CreatePublication; + } + case T_CreateRoleStmt: { return &Any_CreateRole; @@ -1722,6 +1785,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_Drop; } + case OBJECT_PUBLICATION: + { + return &Publication_Drop; + } + case OBJECT_ROUTINE: { return &Routine_Drop; @@ -1901,6 +1969,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_Rename; } + case OBJECT_PUBLICATION: + { + return &Publication_Rename; + } + case OBJECT_ROUTINE: { return &Routine_Rename; diff --git a/src/backend/distributed/commands/publication.c b/src/backend/distributed/commands/publication.c new file mode 100644 index 000000000..581f7f874 --- /dev/null +++ b/src/backend/distributed/commands/publication.c @@ -0,0 +1,634 @@ +/*------------------------------------------------------------------------- + * + * publication.c + * Commands for creating publications + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" +#include "distributed/commands.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/metadata_utility.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" +#include "distributed/reference_table_utils.h" +#include "distributed/worker_create_or_replace.h" +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + +#include "pg_version_compat.h" + + +static CreatePublicationStmt * BuildCreatePublicationStmt(Oid publicationId); +#if (PG_VERSION_NUM >= PG_VERSION_15) +static PublicationObjSpec * BuildPublicationRelationObjSpec(Oid relationId, + Oid publicationId, + bool tableOnly); +#endif +static void AppendPublishOptionList(StringInfo str, List *strings); +static char * AlterPublicationOwnerCommand(Oid publicationId); +static bool ShouldPropagateCreatePublication(CreatePublicationStmt *stmt); +static List * ObjectAddressForPublicationName(char *publicationName, bool missingOk); + + +/* + * PostProcessCreatePublicationStmt handles CREATE PUBLICATION statements + * that contain distributed tables. + */ +List * +PostProcessCreatePublicationStmt(Node *node, const char *queryString) +{ + CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node); + + if (!ShouldPropagateCreatePublication(stmt)) + { + /* should not propagate right now */ + return NIL; + } + + /* call into CreatePublicationStmtObjectAddress */ + List *publicationAddresses = GetObjectAddressListFromParseTree(node, false, true); + + /* the code-path only supports a single object */ + Assert(list_length(publicationAddresses) == 1); + + if (IsAnyObjectAddressOwnedByExtension(publicationAddresses, NULL)) + { + /* should not propagate publications owned by extensions */ + return NIL; + } + + EnsureAllObjectDependenciesExistOnAllNodes(publicationAddresses); + + const ObjectAddress *pubAddress = linitial(publicationAddresses); + + List *commands = NIL; + commands = lappend(commands, DISABLE_DDL_PROPAGATION); + commands = lappend(commands, CreatePublicationDDLCommand(pubAddress->objectId)); + commands = lappend(commands, ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * CreatePublicationDDLCommandsIdempotent returns a list of DDL statements to be + * executed on a node to recreate the publication addressed by the publicationAddress. + */ +List * +CreatePublicationDDLCommandsIdempotent(const ObjectAddress *publicationAddress) +{ + Assert(publicationAddress->classId == PublicationRelationId); + + char *ddlCommand = + CreatePublicationDDLCommand(publicationAddress->objectId); + + char *alterPublicationOwnerSQL = + AlterPublicationOwnerCommand(publicationAddress->objectId); + + return list_make2( + WrapCreateOrReplace(ddlCommand), + alterPublicationOwnerSQL); +} + + +/* + * CreatePublicationDDLCommand returns the CREATE PUBLICATION string that + * can be used to recreate a given publication. + */ +char * +CreatePublicationDDLCommand(Oid publicationId) +{ + CreatePublicationStmt *createPubStmt = BuildCreatePublicationStmt(publicationId); + + /* we took the WHERE clause from the catalog where it is already transformed */ + bool whereClauseRequiresTransform = false; + + /* only propagate Citus tables in publication */ + bool includeLocalTables = false; + + return DeparseCreatePublicationStmtExtended((Node *) createPubStmt, + whereClauseRequiresTransform, + includeLocalTables); +} + + +/* + * BuildCreatePublicationStmt constructs a CreatePublicationStmt struct for the + * given publication. + */ +static CreatePublicationStmt * +BuildCreatePublicationStmt(Oid publicationId) +{ + CreatePublicationStmt *createPubStmt = makeNode(CreatePublicationStmt); + + HeapTuple publicationTuple = + SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(publicationId)); + + if (!HeapTupleIsValid(publicationTuple)) + { + ereport(ERROR, (errmsg("cannot find publication with oid: %d", publicationId))); + } + + Form_pg_publication publicationForm = + (Form_pg_publication) GETSTRUCT(publicationTuple); + + /* CREATE PUBLICATION */ + createPubStmt->pubname = pstrdup(NameStr(publicationForm->pubname)); + + /* FOR ALL TABLES */ + createPubStmt->for_all_tables = publicationForm->puballtables; + + ReleaseSysCache(publicationTuple); + +#if (PG_VERSION_NUM >= PG_VERSION_15) + List *schemaIds = GetPublicationSchemas(publicationId); + Oid schemaId = InvalidOid; + + foreach_oid(schemaId, schemaIds) + { + char *schemaName = get_namespace_name(schemaId); + + PublicationObjSpec *publicationObject = makeNode(PublicationObjSpec); + publicationObject->pubobjtype = PUBLICATIONOBJ_TABLES_IN_SCHEMA; + publicationObject->pubtable = NULL; + publicationObject->name = schemaName; + publicationObject->location = -1; + + createPubStmt->pubobjects = lappend(createPubStmt->pubobjects, publicationObject); + } +#endif + + List *relationIds = GetPublicationRelations(publicationId, + publicationForm->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + Oid relationId = InvalidOid; + int citusTableCount PG_USED_FOR_ASSERTS_ONLY = 0; + + /* mainly for consistent ordering in test output */ + relationIds = SortList(relationIds, CompareOids); + + foreach_oid(relationId, relationIds) + { +#if (PG_VERSION_NUM >= PG_VERSION_15) + bool tableOnly = false; + + /* since postgres 15, tables can have a column list and filter */ + PublicationObjSpec *publicationObject = + BuildPublicationRelationObjSpec(relationId, publicationId, tableOnly); + + createPubStmt->pubobjects = lappend(createPubStmt->pubobjects, publicationObject); +#else + + /* before postgres 15, only full tables are supported */ + char *schemaName = get_namespace_name(get_rel_namespace(relationId)); + char *tableName = get_rel_name(relationId); + RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1); + + createPubStmt->tables = lappend(createPubStmt->tables, rangeVar); +#endif + + if (IsCitusTable(relationId)) + { + citusTableCount++; + } + } + + /* WITH (publish_via_partition_root = true) option */ + bool publishViaRoot = publicationForm->pubviaroot; + char *publishViaRootString = publishViaRoot ? "true" : "false"; + DefElem *pubViaRootOption = makeDefElem("publish_via_partition_root", + (Node *) makeString(publishViaRootString), + -1); + createPubStmt->options = lappend(createPubStmt->options, pubViaRootOption); + + /* WITH (publish = 'insert, update, delete, truncate') option */ + List *publishList = NIL; + + if (publicationForm->pubinsert) + { + publishList = lappend(publishList, makeString("insert")); + } + + if (publicationForm->pubupdate) + { + publishList = lappend(publishList, makeString("update")); + } + + if (publicationForm->pubdelete) + { + publishList = lappend(publishList, makeString("delete")); + } + + if (publicationForm->pubtruncate) + { + publishList = lappend(publishList, makeString("truncate")); + } + + if (list_length(publishList) > 0) + { + StringInfo optionValue = makeStringInfo(); + AppendPublishOptionList(optionValue, publishList); + + DefElem *publishOption = makeDefElem("publish", + (Node *) makeString(optionValue->data), -1); + createPubStmt->options = lappend(createPubStmt->options, publishOption); + } + + + return createPubStmt; +} + + +/* + * AppendPublishOptionList appends a list of publication options in + * comma-separate form. + */ +static void +AppendPublishOptionList(StringInfo str, List *options) +{ + ListCell *stringCell = NULL; + foreach(stringCell, options) + { + const char *string = strVal(lfirst(stringCell)); + if (stringCell != list_head(options)) + { + appendStringInfoString(str, ", "); + } + + /* we cannot escape these strings */ + appendStringInfoString(str, string); + } +} + + +#if (PG_VERSION_NUM >= PG_VERSION_15) + +/* + * BuildPublicationRelationObjSpec returns a PublicationObjSpec that + * can be included in a CREATE or ALTER PUBLICATION statement. + */ +static PublicationObjSpec * +BuildPublicationRelationObjSpec(Oid relationId, Oid publicationId, + bool tableOnly) +{ + HeapTuple pubRelationTuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relationId), + ObjectIdGetDatum(publicationId)); + if (!HeapTupleIsValid(pubRelationTuple)) + { + ereport(ERROR, (errmsg("cannot find relation with oid %d in publication " + "with oid %d", relationId, publicationId))); + } + + List *columnNameList = NIL; + Node *whereClause = NULL; + + /* build the column list */ + if (!tableOnly) + { + bool isNull = false; + Datum attributesDatum = SysCacheGetAttr(PUBLICATIONRELMAP, pubRelationTuple, + Anum_pg_publication_rel_prattrs, + &isNull); + if (!isNull) + { + ArrayType *attributesArray = DatumGetArrayTypeP(attributesDatum); + int attributeCount = ARR_DIMS(attributesArray)[0]; + int16 *elems = (int16 *) ARR_DATA_PTR(attributesArray); + + for (int attNumIndex = 0; attNumIndex < attributeCount; attNumIndex++) + { + AttrNumber attributeNumber = elems[attNumIndex]; + char *columnName = get_attname(relationId, attributeNumber, false); + + columnNameList = lappend(columnNameList, makeString(columnName)); + } + } + + /* build the WHERE clause */ + Datum whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, pubRelationTuple, + Anum_pg_publication_rel_prqual, + &isNull); + if (!isNull) + { + /* + * We use the already-transformed parse tree form here, which does + * not match regular CreatePublicationStmt + */ + whereClause = stringToNode(TextDatumGetCString(whereClauseDatum)); + } + } + + ReleaseSysCache(pubRelationTuple); + + char *schemaName = get_namespace_name(get_rel_namespace(relationId)); + char *tableName = get_rel_name(relationId); + RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1); + + /* build the FOR TABLE */ + PublicationTable *publicationTable = + makeNode(PublicationTable); + publicationTable->relation = rangeVar; + publicationTable->whereClause = whereClause; + publicationTable->columns = columnNameList; + + PublicationObjSpec *publicationObject = makeNode(PublicationObjSpec); + publicationObject->pubobjtype = PUBLICATIONOBJ_TABLE; + publicationObject->pubtable = publicationTable; + publicationObject->name = NULL; + publicationObject->location = -1; + + return publicationObject; +} + + +#endif + + +/* + * PreprocessAlterPublicationStmt handles ALTER PUBLICATION statements + * in a way that is mostly similar to PreprocessAlterDistributedObjectStmt, + * except we do not ensure sequential mode (publications do not interact with + * shards) and can handle NULL deparse commands for ALTER PUBLICATION commands + * that only involve local tables. + */ +List * +PreprocessAlterPublicationStmt(Node *stmt, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + List *addresses = GetObjectAddressListFromParseTree(stmt, false, false); + + /* the code-path only supports a single object */ + Assert(list_length(addresses) == 1); + + if (!ShouldPropagateAnyObject(addresses)) + { + return NIL; + } + + EnsureCoordinator(); + QualifyTreeNode(stmt); + + const char *sql = DeparseTreeNode((Node *) stmt); + if (sql == NULL) + { + /* + * Deparsing logic decided that there is nothing to propagate, e.g. + * because the command only concerns local tables. + */ + return NIL; + } + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * GetAlterPublicationDDLCommandsForTable gets a list of ALTER PUBLICATION .. ADD/DROP + * commands for the given table. + * + * If isAdd is true, it return ALTER PUBLICATION .. ADD TABLE commands for all + * publications. + * + * Otherwise, it returns ALTER PUBLICATION .. DROP TABLE commands for all + * publications. + */ +List * +GetAlterPublicationDDLCommandsForTable(Oid relationId, bool isAdd) +{ + List *commands = NIL; + + List *publicationIds = GetRelationPublications(relationId); + Oid publicationId = InvalidOid; + + foreach_oid(publicationId, publicationIds) + { + char *command = GetAlterPublicationTableDDLCommand(publicationId, + relationId, isAdd); + + commands = lappend(commands, command); + } + + return commands; +} + + +/* + * GetAlterPublicationTableDDLCommand generates an ALTer PUBLICATION .. ADD/DROP TABLE + * command for the given publication and relation ID. + * + * If isAdd is true, it return an ALTER PUBLICATION .. ADD TABLE command. + * Otherwise, it returns ALTER PUBLICATION .. DROP TABLE command. + */ +char * +GetAlterPublicationTableDDLCommand(Oid publicationId, Oid relationId, + bool isAdd) +{ + HeapTuple pubTuple = SearchSysCache1(PUBLICATIONOID, + ObjectIdGetDatum(publicationId)); + if (!HeapTupleIsValid(pubTuple)) + { + ereport(ERROR, (errmsg("cannot find publication with oid: %d", + publicationId))); + } + + Form_pg_publication pubForm = (Form_pg_publication) GETSTRUCT(pubTuple); + + AlterPublicationStmt *alterPubStmt = makeNode(AlterPublicationStmt); + alterPubStmt->pubname = pstrdup(NameStr(pubForm->pubname)); + + ReleaseSysCache(pubTuple); + +#if (PG_VERSION_NUM >= PG_VERSION_15) + bool tableOnly = !isAdd; + + /* since postgres 15, tables can have a column list and filter */ + PublicationObjSpec *publicationObject = + BuildPublicationRelationObjSpec(relationId, publicationId, tableOnly); + + alterPubStmt->pubobjects = lappend(alterPubStmt->pubobjects, publicationObject); + alterPubStmt->action = isAdd ? AP_AddObjects : AP_DropObjects; +#else + + /* before postgres 15, only full tables are supported */ + char *schemaName = get_namespace_name(get_rel_namespace(relationId)); + char *tableName = get_rel_name(relationId); + RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1); + + alterPubStmt->tables = lappend(alterPubStmt->tables, rangeVar); + alterPubStmt->tableAction = isAdd ? DEFELEM_ADD : DEFELEM_DROP; +#endif + + /* we take the WHERE clause from the catalog where it is already transformed */ + bool whereClauseNeedsTransform = false; + + /* + * We use these commands to restore publications before/after transforming a + * table, including transformations to/from local tables. + */ + bool includeLocalTables = true; + + char *command = DeparseAlterPublicationStmtExtended((Node *) alterPubStmt, + whereClauseNeedsTransform, + includeLocalTables); + + return command; +} + + +/* + * AlterPublicationOwnerCommand returns "ALTER PUBLICATION .. OWNER TO .." + * statement for the specified publication. + */ +static char * +AlterPublicationOwnerCommand(Oid publicationId) +{ + HeapTuple publicationTuple = + SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(publicationId)); + + if (!HeapTupleIsValid(publicationTuple)) + { + ereport(ERROR, (errmsg("cannot find publication with oid: %d", + publicationId))); + } + + Form_pg_publication publicationForm = + (Form_pg_publication) GETSTRUCT(publicationTuple); + + char *publicationName = NameStr(publicationForm->pubname); + Oid publicationOwnerId = publicationForm->pubowner; + + char *publicationOwnerName = GetUserNameFromId(publicationOwnerId, false); + + StringInfo alterCommand = makeStringInfo(); + appendStringInfo(alterCommand, "ALTER PUBLICATION %s OWNER TO %s", + quote_identifier(publicationName), + quote_identifier(publicationOwnerName)); + + ReleaseSysCache(publicationTuple); + + return alterCommand->data; +} + + +/* + * ShouldPropagateCreatePublication tests if we need to propagate a CREATE PUBLICATION + * statement. + */ +static bool +ShouldPropagateCreatePublication(CreatePublicationStmt *stmt) +{ + if (!ShouldPropagate()) + { + return false; + } + + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return false; + } + + return true; +} + + +/* + * AlterPublicationStmtObjectAddress generates the object address for the + * publication altered by a regular ALTER PUBLICATION .. statement. + */ +List * +AlterPublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess) +{ + AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node); + + return ObjectAddressForPublicationName(stmt->pubname, missingOk); +} + + +/* + * AlterPublicationOwnerStmtObjectAddress generates the object address for the + * publication altered by the given ALTER PUBLICATION .. OWNER TO statement. + */ +List * +AlterPublicationOwnerStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess) +{ + AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); + + return ObjectAddressForPublicationName(strVal(stmt->object), missingOk); +} + + +/* + * CreatePublicationStmtObjectAddress generates the object address for the + * publication created by the given CREATE PUBLICATION statement. + */ +List * +CreatePublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostProcess) +{ + CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node); + + return ObjectAddressForPublicationName(stmt->pubname, missingOk); +} + + +/* + * RenamePublicationStmtObjectAddress generates the object address for the + * publication altered by the given ALter PUBLICATION .. RENAME TO statement. + */ +List * +RenamePublicationStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + + return ObjectAddressForPublicationName(strVal(stmt->object), missingOk); +} + + +/* + * ObjectAddressForPublicationName returns the object address for a given publication + * name. + */ +static List * +ObjectAddressForPublicationName(char *publicationName, bool missingOk) +{ + Oid publicationId = InvalidOid; + + HeapTuple publicationTuple = + SearchSysCache1(PUBLICATIONNAME, CStringGetDatum(publicationName)); + if (HeapTupleIsValid(publicationTuple)) + { + Form_pg_publication publicationForm = + (Form_pg_publication) GETSTRUCT(publicationTuple); + publicationId = publicationForm->oid; + + ReleaseSysCache(publicationTuple); + } + else if (!missingOk) + { + /* it should have just been created */ + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication \"%s\" does not exist", publicationName))); + } + + ObjectAddress *address = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*address, PublicationRelationId, publicationId); + + return list_make1(address); +} diff --git a/src/backend/distributed/deparser/deparse_publication_stmts.c b/src/backend/distributed/deparser/deparse_publication_stmts.c new file mode 100644 index 000000000..deb8e7285 --- /dev/null +++ b/src/backend/distributed/deparser/deparse_publication_stmts.c @@ -0,0 +1,690 @@ +/*------------------------------------------------------------------------- + * + * deparse_publication_stmts.c + * All routines to deparse publication statements. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relation.h" +#include "catalog/namespace.h" +#include "commands/defrem.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/namespace_utils.h" +#include "lib/stringinfo.h" +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_node.h" +#include "parser/parse_relation.h" +#include "nodes/value.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/ruleutils.h" + + +static void AppendCreatePublicationStmt(StringInfo buf, CreatePublicationStmt *stmt, + bool whereClauseNeedsTransform, + bool includeLocalTables); +#if (PG_VERSION_NUM >= PG_VERSION_15) +static bool AppendPublicationObjects(StringInfo buf, List *publicationObjects, + bool whereClauseNeedsTransform, + bool includeLocalTables); +static void AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName, + Node *whereClause, + bool whereClauseNeedsTransform); +static void AppendAlterPublicationAction(StringInfo buf, AlterPublicationAction action); +#else +static bool AppendTables(StringInfo buf, List *tables, bool includeLocalTables); +static void AppendDefElemAction(StringInfo buf, DefElemAction action); +#endif +static bool AppendAlterPublicationStmt(StringInfo buf, AlterPublicationStmt *stmt, + bool whereClauseNeedsTransform, + bool includeLocalTables); +static void AppendDropPublicationStmt(StringInfo buf, DropStmt *stmt); +static void AppendRenamePublicationStmt(StringInfo buf, RenameStmt *stmt); +static void AppendAlterPublicationOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); +static void AppendPublicationOptions(StringInfo stringBuffer, List *optionList); +static void AppendIdentifierList(StringInfo buf, List *objects); + + +/* + * DeparseCreatePublicationStmt builds and returns a string representing a + * CreatePublicationStmt. + */ +char * +DeparseCreatePublicationStmt(Node *node) +{ + /* regular deparsing function takes CREATE PUBLICATION from the parser */ + bool whereClauseNeedsTransform = false; + + /* for regular CREATE PUBLICATION we do not propagate local tables */ + bool includeLocalTables = false; + + return DeparseCreatePublicationStmtExtended(node, whereClauseNeedsTransform, + includeLocalTables); +} + + +/* + * DeparseCreatePublicationStmtExtended builds and returns a string representing a + * CreatePublicationStmt, which may have already-transformed expressions. + */ +char * +DeparseCreatePublicationStmtExtended(Node *node, bool whereClauseNeedsTransform, + bool includeLocalTables) +{ + CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node); + + StringInfoData str = { 0 }; + initStringInfo(&str); + + AppendCreatePublicationStmt(&str, stmt, whereClauseNeedsTransform, + includeLocalTables); + + return str.data; +} + + +/* + * AppendCreatePublicationStmt appends a string representing a + * CreatePublicationStmt to a buffer. + */ +static void +AppendCreatePublicationStmt(StringInfo buf, CreatePublicationStmt *stmt, + bool whereClauseNeedsTransform, + bool includeLocalTables) +{ + appendStringInfo(buf, "CREATE PUBLICATION %s", + quote_identifier(stmt->pubname)); + + if (stmt->for_all_tables) + { + appendStringInfoString(buf, " FOR ALL TABLES"); + } +#if (PG_VERSION_NUM >= PG_VERSION_15) + else if (stmt->pubobjects != NIL) + { + bool hasObjects = false; + PublicationObjSpec *publicationObject = NULL; + + /* + * Check whether there are objects to propagate, mainly to know whether + * we should include "FOR". + */ + foreach_ptr(publicationObject, stmt->pubobjects) + { + if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE) + { + /* FOR TABLE ... */ + PublicationTable *publicationTable = publicationObject->pubtable; + + if (includeLocalTables || + IsCitusTableRangeVar(publicationTable->relation, NoLock, false)) + { + hasObjects = true; + break; + } + } + else + { + hasObjects = true; + break; + } + } + + if (hasObjects) + { + appendStringInfoString(buf, " FOR"); + AppendPublicationObjects(buf, stmt->pubobjects, whereClauseNeedsTransform, + includeLocalTables); + } + } +#else + else if (stmt->tables != NIL) + { + bool hasTables = false; + RangeVar *rangeVar = NULL; + + /* + * Check whether there are tables to propagate, mainly to know whether + * we should include "FOR". + */ + foreach_ptr(rangeVar, stmt->tables) + { + if (includeLocalTables || IsCitusTableRangeVar(rangeVar, NoLock, false)) + { + hasTables = true; + break; + } + } + + if (hasTables) + { + appendStringInfoString(buf, " FOR"); + AppendTables(buf, stmt->tables, includeLocalTables); + } + } +#endif + + if (stmt->options != NIL) + { + appendStringInfoString(buf, " WITH ("); + AppendPublicationOptions(buf, stmt->options); + appendStringInfoString(buf, ")"); + } +} + + +#if (PG_VERSION_NUM >= PG_VERSION_15) + +/* + * AppendPublicationObjects appends a string representing a list of publication + * objects to a buffer. + * + * For instance: TABLE users, departments, TABLES IN SCHEMA production + */ +static bool +AppendPublicationObjects(StringInfo buf, List *publicationObjects, + bool whereClauseNeedsTransform, + bool includeLocalTables) +{ + PublicationObjSpec *publicationObject = NULL; + bool appendedObject = false; + + foreach_ptr(publicationObject, publicationObjects) + { + if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE) + { + /* FOR TABLE ... */ + PublicationTable *publicationTable = publicationObject->pubtable; + RangeVar *rangeVar = publicationTable->relation; + char *schemaName = rangeVar->schemaname; + char *tableName = rangeVar->relname; + + if (!includeLocalTables && !IsCitusTableRangeVar(rangeVar, NoLock, false)) + { + /* do not propagate local tables */ + continue; + } + + if (schemaName != NULL) + { + /* qualified table name */ + appendStringInfo(buf, "%s TABLE %s", + appendedObject ? "," : "", + quote_qualified_identifier(schemaName, tableName)); + } + else + { + /* unqualified table name */ + appendStringInfo(buf, "%s TABLE %s", + appendedObject ? "," : "", + quote_identifier(tableName)); + } + + if (publicationTable->columns != NIL) + { + appendStringInfoString(buf, " ("); + AppendIdentifierList(buf, publicationTable->columns); + appendStringInfoString(buf, ")"); + } + + if (publicationTable->whereClause != NULL) + { + appendStringInfoString(buf, " WHERE ("); + + AppendWhereClauseExpression(buf, rangeVar, + publicationTable->whereClause, + whereClauseNeedsTransform); + + appendStringInfoString(buf, ")"); + } + } + else + { + /* FOR TABLES IN SCHEMA */ + char *schemaName = publicationObject->name; + + if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA) + { + List *searchPath = fetch_search_path(false); + if (searchPath == NIL) + { + ereport(ERROR, errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for " + "CURRENT_SCHEMA")); + } + + schemaName = get_namespace_name(linitial_oid(searchPath)); + } + + appendStringInfo(buf, "%s TABLES IN SCHEMA %s", + appendedObject ? "," : "", + quote_identifier(schemaName)); + } + + appendedObject = true; + } + + return appendedObject; +} + + +/* + * AppendWhereClauseExpression appends a deparsed expression that can + * contain a filter on the given table. If whereClauseNeedsTransform is set + * the expression is first tranformed. + */ +static void +AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName, + Node *whereClause, bool whereClauseNeedsTransform) +{ + Relation relation = relation_openrv(tableName, AccessShareLock); + + if (whereClauseNeedsTransform) + { + ParseState *pstate = make_parsestate(NULL); + pstate->p_sourcetext = ""; + ParseNamespaceItem *nsitem = addRangeTableEntryForRelation(pstate, + relation, + AccessShareLock, NULL, + false, false); + addNSItemToQuery(pstate, nsitem, false, true, true); + + whereClause = transformWhereClause(pstate, + copyObject(whereClause), + EXPR_KIND_WHERE, + "PUBLICATION WHERE"); + + assign_expr_collations(pstate, whereClause); + } + + List *relationContext = deparse_context_for(tableName->relname, relation->rd_id); + + PushOverrideEmptySearchPath(CurrentMemoryContext); + char *whereClauseString = deparse_expression(whereClause, + relationContext, + true, true); + PopOverrideSearchPath(); + + appendStringInfoString(buf, whereClauseString); + + relation_close(relation, AccessShareLock); +} + + +#else + +/* + * AppendPublicationObjects appends a string representing a list of publication + * objects to a buffer. + * + * For instance: TABLE users, departments + */ +static bool +AppendTables(StringInfo buf, List *tables, bool includeLocalTables) +{ + RangeVar *rangeVar = NULL; + bool appendedObject = false; + + foreach_ptr(rangeVar, tables) + { + if (!includeLocalTables && + !IsCitusTableRangeVar(rangeVar, NoLock, false)) + { + /* do not propagate local tables */ + continue; + } + + char *schemaName = rangeVar->schemaname; + char *tableName = rangeVar->relname; + + if (schemaName != NULL) + { + /* qualified table name */ + appendStringInfo(buf, "%s %s", + appendedObject ? "," : " TABLE", + quote_qualified_identifier(schemaName, tableName)); + } + else + { + /* unqualified table name */ + appendStringInfo(buf, "%s %s", + appendedObject ? "," : " TABLE", + quote_identifier(tableName)); + } + + appendedObject = true; + } + + return appendedObject; +} + + +#endif + + +/* + * DeparseAlterPublicationSchemaStmt builds and returns a string representing + * an AlterPublicationStmt. + */ +char * +DeparseAlterPublicationStmt(Node *node) +{ + /* regular deparsing function takes ALTER PUBLICATION from the parser */ + bool whereClauseNeedsTransform = true; + + /* for regular ALTER PUBLICATION we do not propagate local tables */ + bool includeLocalTables = false; + + return DeparseAlterPublicationStmtExtended(node, whereClauseNeedsTransform, + includeLocalTables); +} + + +/* + * DeparseAlterPublicationStmtExtended builds and returns a string representing a + * AlterPublicationStmt, which may have already-transformed expressions. + */ +char * +DeparseAlterPublicationStmtExtended(Node *node, bool whereClauseNeedsTransform, + bool includeLocalTables) +{ + AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node); + StringInfoData str = { 0 }; + initStringInfo(&str); + + if (!AppendAlterPublicationStmt(&str, stmt, whereClauseNeedsTransform, + includeLocalTables)) + { + Assert(!includeLocalTables); + + /* + * When there are no objects to propagate, then there is no + * valid ALTER PUBLICATION to construct. + */ + return NULL; + } + + return str.data; +} + + +/* + * AppendAlterPublicationStmt appends a string representing an AlterPublicationStmt + * of the form ALTER PUBLICATION .. ADD/SET/DROP + */ +static bool +AppendAlterPublicationStmt(StringInfo buf, AlterPublicationStmt *stmt, + bool whereClauseNeedsTransform, + bool includeLocalTables) +{ + appendStringInfo(buf, "ALTER PUBLICATION %s", + quote_identifier(stmt->pubname)); + + if (stmt->options) + { + appendStringInfoString(buf, " SET ("); + AppendPublicationOptions(buf, stmt->options); + appendStringInfoString(buf, ")"); + + /* changing options cannot be combined with other actions */ + return true; + } + +#if (PG_VERSION_NUM >= PG_VERSION_15) + AppendAlterPublicationAction(buf, stmt->action); + return AppendPublicationObjects(buf, stmt->pubobjects, whereClauseNeedsTransform, + includeLocalTables); +#else + AppendDefElemAction(buf, stmt->tableAction); + return AppendTables(buf, stmt->tables, includeLocalTables); +#endif +} + + +#if (PG_VERSION_NUM >= PG_VERSION_15) + +/* + * AppendAlterPublicationAction appends a string representing an AlterPublicationAction + * to a buffer. + */ +static void +AppendAlterPublicationAction(StringInfo buf, AlterPublicationAction action) +{ + switch (action) + { + case AP_AddObjects: + { + appendStringInfoString(buf, " ADD"); + break; + } + + case AP_DropObjects: + { + appendStringInfoString(buf, " DROP"); + break; + } + + case AP_SetObjects: + { + appendStringInfoString(buf, " SET"); + break; + } + + default: + { + ereport(ERROR, (errmsg("unrecognized publication action: %d", action))); + } + } +} + + +#else + +/* + * AppendDefElemAction appends a string representing a DefElemAction + * to a buffer. + */ +static void +AppendDefElemAction(StringInfo buf, DefElemAction action) +{ + switch (action) + { + case DEFELEM_ADD: + { + appendStringInfoString(buf, " ADD"); + break; + } + + case DEFELEM_DROP: + { + appendStringInfoString(buf, " DROP"); + break; + } + + case DEFELEM_SET: + { + appendStringInfoString(buf, " SET"); + break; + } + + default: + { + ereport(ERROR, (errmsg("unrecognized publication action: %d", action))); + } + } +} + + +#endif + + +/* + * DeparseDropPublicationStmt builds and returns a string representing the DropStmt + */ +char * +DeparseDropPublicationStmt(Node *node) +{ + DropStmt *stmt = castNode(DropStmt, node); + StringInfoData str = { 0 }; + initStringInfo(&str); + + Assert(stmt->removeType == OBJECT_PUBLICATION); + + AppendDropPublicationStmt(&str, stmt); + + return str.data; +} + + +/* + * AppendDropPublicationStmt appends a string representing the DropStmt to a buffer + */ +static void +AppendDropPublicationStmt(StringInfo buf, DropStmt *stmt) +{ + appendStringInfoString(buf, "DROP PUBLICATION "); + if (stmt->missing_ok) + { + appendStringInfoString(buf, "IF EXISTS "); + } + AppendIdentifierList(buf, stmt->objects); + if (stmt->behavior == DROP_CASCADE) + { + appendStringInfoString(buf, " CASCADE"); + } +} + + +/* + * DeparseRenamePublicationStmt builds and returns a string representing the RenameStmt + */ +char * +DeparseRenamePublicationStmt(Node *node) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + StringInfoData str = { 0 }; + initStringInfo(&str); + + Assert(stmt->renameType == OBJECT_PUBLICATION); + + AppendRenamePublicationStmt(&str, stmt); + + return str.data; +} + + +/* + * AppendRenamePublicationStmt appends a string representing the RenameStmt to a buffer + */ +static void +AppendRenamePublicationStmt(StringInfo buf, RenameStmt *stmt) +{ + appendStringInfo(buf, "ALTER PUBLICATION %s RENAME TO %s;", + quote_identifier(strVal(stmt->object)), + quote_identifier(stmt->newname)); +} + + +/* + * DeparseAlterPublicationOwnerStmt builds and returns a string representing the AlterOwnerStmt + */ +char * +DeparseAlterPublicationOwnerStmt(Node *node) +{ + AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); + StringInfoData str = { 0 }; + initStringInfo(&str); + + Assert(stmt->objectType == OBJECT_PUBLICATION); + + AppendAlterPublicationOwnerStmt(&str, stmt); + + return str.data; +} + + +/* + * AppendAlterPublicationOwnerStmt appends a string representing the AlterOwnerStmt to a buffer + */ +static void +AppendAlterPublicationOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) +{ + Assert(stmt->objectType == OBJECT_PUBLICATION); + + appendStringInfo(buf, "ALTER PUBLICATION %s OWNER TO %s;", + quote_identifier(strVal(stmt->object)), + RoleSpecString(stmt->newowner, true)); +} + + +/* + * AppendPublicationOptions appends a string representing a list of publication opions. + */ +static void +AppendPublicationOptions(StringInfo stringBuffer, List *optionList) +{ + ListCell *optionCell = NULL; + bool firstOptionPrinted = false; + + foreach(optionCell, optionList) + { + DefElem *option = (DefElem *) lfirst(optionCell); + char *optionName = option->defname; + char *optionValue = defGetString(option); + NodeTag valueType = nodeTag(option->arg); + + if (firstOptionPrinted) + { + appendStringInfo(stringBuffer, ", "); + } + firstOptionPrinted = true; + + appendStringInfo(stringBuffer, "%s = ", + quote_identifier(optionName)); + +#if (PG_VERSION_NUM >= PG_VERSION_15) + if (valueType == T_Integer || valueType == T_Float || valueType == T_Boolean) +#else + if (valueType == T_Integer || valueType == T_Float) +#endif + { + /* string escaping is unnecessary for numeric types and can cause issues */ + appendStringInfo(stringBuffer, "%s", optionValue); + } + else + { + appendStringInfo(stringBuffer, "%s", quote_literal_cstr(optionValue)); + } + } +} + + +/* + * AppendIdentifierList appends a string representing a list of + * identifiers (of String type). + */ +static void +AppendIdentifierList(StringInfo buf, List *objects) +{ + ListCell *objectCell = NULL; + + foreach(objectCell, objects) + { + char *name = strVal(lfirst(objectCell)); + + if (objectCell != list_head(objects)) + { + appendStringInfo(buf, ", "); + } + + appendStringInfoString(buf, quote_identifier(name)); + } +} diff --git a/src/backend/distributed/deparser/qualify_publication_stmt.c b/src/backend/distributed/deparser/qualify_publication_stmt.c new file mode 100644 index 000000000..3231fe363 --- /dev/null +++ b/src/backend/distributed/deparser/qualify_publication_stmt.c @@ -0,0 +1,119 @@ +/*------------------------------------------------------------------------- + * + * qualify_publication_stmt.c + * Functions specialized in fully qualifying all publication statements. These + * functions are dispatched from qualify.c + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/namespace.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "nodes/nodes.h" +#include "utils/guc.h" +#include "utils/lsyscache.h" + +#if (PG_VERSION_NUM >= PG_VERSION_15) +static void QualifyPublicationObjects(List *publicationObjects); +#else +static void QualifyTables(List *tables); +#endif +static void QualifyPublicationRangeVar(RangeVar *publication); + + +/* + * QualifyCreatePublicationStmt quailifies the publication names of the + * CREATE PUBLICATION statement. + */ +void +QualifyCreatePublicationStmt(Node *node) +{ + CreatePublicationStmt *stmt = castNode(CreatePublicationStmt, node); + +#if (PG_VERSION_NUM >= PG_VERSION_15) + QualifyPublicationObjects(stmt->pubobjects); +#else + QualifyTables(stmt->tables); +#endif +} + + +#if (PG_VERSION_NUM >= PG_VERSION_15) + +/* + * QualifyPublicationObjects ensures all table names in a list of + * publication objects are fully qualified. + */ +static void +QualifyPublicationObjects(List *publicationObjects) +{ + PublicationObjSpec *publicationObject = NULL; + + foreach_ptr(publicationObject, publicationObjects) + { + if (publicationObject->pubobjtype == PUBLICATIONOBJ_TABLE) + { + /* FOR TABLE ... */ + PublicationTable *publicationTable = publicationObject->pubtable; + + QualifyPublicationRangeVar(publicationTable->relation); + } + } +} + + +#else + +/* + * QualifyTables ensures all table names in a list are fully qualified. + */ +static void +QualifyTables(List *tables) +{ + RangeVar *rangeVar = NULL; + + foreach_ptr(rangeVar, tables) + { + QualifyPublicationRangeVar(rangeVar); + } +} + + +#endif + + +/* + * QualifyPublicationObjects ensures all table names in a list of + * publication objects are fully qualified. + */ +void +QualifyAlterPublicationStmt(Node *node) +{ + AlterPublicationStmt *stmt = castNode(AlterPublicationStmt, node); + +#if (PG_VERSION_NUM >= PG_VERSION_15) + QualifyPublicationObjects(stmt->pubobjects); +#else + QualifyTables(stmt->tables); +#endif +} + + +/* + * QualifyPublicationRangeVar qualifies the given publication RangeVar if it is not qualified. + */ +static void +QualifyPublicationRangeVar(RangeVar *publication) +{ + if (publication->schemaname == NULL) + { + Oid publicationOid = RelnameGetRelid(publication->relname); + Oid schemaOid = get_rel_namespace(publicationOid); + publication->schemaname = get_namespace_name(schemaOid); + } +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index a0063adc8..04cb39a58 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -802,6 +802,11 @@ GetObjectTypeString(ObjectType objType) return "function"; } + case OBJECT_PUBLICATION: + { + return "publication"; + } + case OBJECT_SCHEMA: { return "schema"; diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index a67c8fed0..983b45e74 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -132,6 +132,7 @@ typedef struct ViewDependencyNode static List * GetRelationSequenceDependencyList(Oid relationId); static List * GetRelationFunctionDependencyList(Oid relationId); static List * GetRelationTriggerFunctionDependencyList(Oid relationId); +static List * GetPublicationRelationsDependencyList(Oid relationId); static List * GetRelationStatsSchemaDependencyList(Oid relationId); static List * GetRelationIndicesDependencyList(Oid relationId); static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId); @@ -722,6 +723,11 @@ SupportedDependencyByCitus(const ObjectAddress *address) return true; } + case OCLASS_PUBLICATION: + { + return true; + } + case OCLASS_TSCONFIG: { return true; @@ -1656,6 +1662,36 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe List *ruleRefDepList = GetViewRuleReferenceDependencyList(relationId); result = list_concat(result, ruleRefDepList); } + + break; + } + + case PublicationRelationId: + { + Oid publicationId = target.objectId; + + /* + * Publications do not depend directly on relations, because dropping + * the relation will only remove it from the publications. However, + * we add a dependency to ensure the relation is created first when + * adding a node. + */ + List *relationDependencyList = + GetPublicationRelationsDependencyList(publicationId); + result = list_concat(result, relationDependencyList); + + /* + * As of PostgreSQL 15, the same applies to schemas. + */ +#if PG_VERSION_NUM >= PG_VERSION_15 + List *schemaIdList = + GetPublicationSchemas(publicationId); + List *schemaDependencyList = + CreateObjectAddressDependencyDefList(NamespaceRelationId, schemaIdList); + result = list_concat(result, schemaDependencyList); +#endif + + break; } default: @@ -1923,6 +1959,33 @@ GetRelationTriggerFunctionDependencyList(Oid relationId) } +/* + * GetPublicationRelationsDependencyList creates a list of ObjectAddressDependencies for + * a publication on the Citus relations it contains. This helps make sure we distribute + * Citus tables before local tables. + */ +static List * +GetPublicationRelationsDependencyList(Oid publicationId) +{ + List *allRelationIds = GetPublicationRelations(publicationId, PUBLICATION_PART_ROOT); + List *citusRelationIds = NIL; + + Oid relationId = InvalidOid; + + foreach_oid(relationId, allRelationIds) + { + if (!IsCitusTable(relationId)) + { + continue; + } + + citusRelationIds = lappend_oid(citusRelationIds, relationId); + } + + return CreateObjectAddressDependencyDefList(RelationRelationId, citusRelationIds); +} + + /* * GetTypeConstraintDependencyDefinition creates a list of constraint dependency * definitions for a given type diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index b7753108c..1e73eef6b 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -590,6 +590,18 @@ IsCitusTable(Oid relationId) } +/* + * IsCitusTableRangeVar returns whether the table named in the given + * rangeVar is a Citus table. + */ +bool +IsCitusTableRangeVar(RangeVar *rangeVar, LOCKMODE lockMode, bool missingOK) +{ + Oid relationId = RangeVarGetRelid(rangeVar, lockMode, missingOK); + return IsCitusTable(relationId); +} + + /* * IsCitusTableViaCatalog returns whether the given relation is a * distributed table or not. diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index df9104efd..01f245db1 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -100,6 +100,7 @@ static bool HasMetadataWorkers(void); static void CreateShellTableOnWorkers(Oid relationId); static void CreateTableMetadataOnWorkers(Oid relationId); static void CreateDependingViewsOnWorkers(Oid relationId); +static void AddTableToPublications(Oid relationId); static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void); static bool ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey); @@ -302,7 +303,8 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort) * Our definition of metadata includes the shell table and its inter relations with * other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard * and pg_dist_shard placement entries. This function also propagates the views that - * depend on the given relation, to the metadata workers. + * depend on the given relation, to the metadata workers, and adds the relation to + * the appropriate publications. */ void SyncCitusTableMetadata(Oid relationId) @@ -319,6 +321,7 @@ SyncCitusTableMetadata(Oid relationId) } CreateDependingViewsOnWorkers(relationId); + AddTableToPublications(relationId); } @@ -364,6 +367,49 @@ CreateDependingViewsOnWorkers(Oid relationId) } +/* + * AddTableToPublications adds the table to a publication on workers with metadata. + */ +static void +AddTableToPublications(Oid relationId) +{ + List *publicationIds = GetRelationPublications(relationId); + if (publicationIds == NIL) + { + return; + } + + Oid publicationId = InvalidOid; + + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + + foreach_oid(publicationId, publicationIds) + { + ObjectAddress *publicationAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*publicationAddress, PublicationRelationId, publicationId); + List *addresses = list_make1(publicationAddress); + + if (!ShouldPropagateAnyObject(addresses)) + { + /* skip non-distributed publications */ + continue; + } + + /* ensure schemas exist */ + EnsureAllObjectDependenciesExistOnAllNodes(addresses); + + bool isAdd = true; + char *alterPublicationCommand = + GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd); + + /* send ALTER PUBLICATION .. ADD to workers with metadata */ + SendCommandToWorkersWithMetadata(alterPublicationCommand); + } + + SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); +} + + /* * EnsureSequentialModeMetadataOperations makes sure that the current transaction is * already in sequential mode, or can still safely be put in sequential mode, diff --git a/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c b/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c index 00c2da620..bcd74fbbc 100644 --- a/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c +++ b/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c @@ -425,6 +425,7 @@ ErrorIfCurrentUserCanNotDistributeObject(char *textType, ObjectType type, case OBJECT_COLLATION: case OBJECT_VIEW: case OBJECT_ROLE: + case OBJECT_PUBLICATION: { check_object_ownership(userId, type, *addr, node, *relation); break; diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 86b40bfba..6a6db2c7f 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1338,7 +1338,9 @@ CreatePublications(MultiConnection *connection, worker->groupId, CLEANUP_ALWAYS); + ExecuteCriticalRemoteCommand(connection, DISABLE_DDL_PROPAGATION); ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); + ExecuteCriticalRemoteCommand(connection, ENABLE_DDL_PROPAGATION); pfree(createPublicationCommand->data); pfree(createPublicationCommand); } diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 148678da7..f2ac9f3e2 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -1,4 +1,9 @@ -- citus--11.2-1--11.3-1 #include "udfs/repl_origin_helper/11.3-1.sql" --- bump version to 11.3-1 +ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index; +ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index; +ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index; +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY USING INDEX pg_dist_rebalance_strategy_name_key; +ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_shardid_index; +ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint; diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 02a92f967..ae5b799e4 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -1,4 +1,12 @@ -- citus--11.3-1--11.2-1 + DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active(); + +ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING; +ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING; +ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING; +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING; +ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING; +ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING; diff --git a/src/backend/distributed/worker/worker_create_or_replace.c b/src/backend/distributed/worker/worker_create_or_replace.c index 572cf1420..b40f712b5 100644 --- a/src/backend/distributed/worker/worker_create_or_replace.c +++ b/src/backend/distributed/worker/worker_create_or_replace.c @@ -35,8 +35,22 @@ #include "distributed/worker_create_or_replace.h" #include "distributed/worker_protocol.h" + +/* + * OnCollisionAction describes what to do when the created object + * and existing object do not match. + */ +typedef enum OnCollisionAction +{ + ON_COLLISION_RENAME, + ON_COLLISION_DROP +} OnCollisionAction; + + static List * CreateStmtListByObjectAddress(const ObjectAddress *address); static bool CompareStringList(List *list1, List *list2); +static OnCollisionAction GetOnCollisionAction(const ObjectAddress *address); + PG_FUNCTION_INFO_V1(worker_create_or_replace_object); PG_FUNCTION_INFO_V1(worker_create_or_replace_object_array); @@ -192,7 +206,8 @@ WorkerCreateOrReplaceObject(List *sqlStatements) /* * Object with name from statement is already found locally, check if states are * identical. If objects differ we will rename the old object (non- destructively) - * as to make room to create the new object according to the spec sent. + * or drop it (if safe) as to make room to create the new object according to the + * spec sent. */ /* @@ -213,11 +228,22 @@ WorkerCreateOrReplaceObject(List *sqlStatements) return false; } - char *newName = GenerateBackupNameForCollision(address); + Node *utilityStmt = NULL; - RenameStmt *renameStmt = CreateRenameStatement(address, newName); - const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt); - ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt, + if (GetOnCollisionAction(address) == ON_COLLISION_DROP) + { + /* drop the existing object */ + utilityStmt = (Node *) CreateDropStmt(address); + } + else + { + /* rename the existing object */ + char *newName = GenerateBackupNameForCollision(address); + utilityStmt = (Node *) CreateRenameStatement(address, newName); + } + + const char *commandString = DeparseTreeNode(utilityStmt); + ProcessUtilityParseTree(utilityStmt, commandString, PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL); } @@ -286,6 +312,11 @@ CreateStmtListByObjectAddress(const ObjectAddress *address) return list_make1(GetFunctionDDLCommand(address->objectId, false)); } + case OCLASS_PUBLICATION: + { + return list_make1(CreatePublicationDDLCommand(address->objectId)); + } + case OCLASS_TSCONFIG: { List *stmts = GetCreateTextSearchConfigStatements(address); @@ -312,6 +343,37 @@ CreateStmtListByObjectAddress(const ObjectAddress *address) } +/* + * GetOnCollisionAction decides what to do when the object already exists. + */ +static OnCollisionAction +GetOnCollisionAction(const ObjectAddress *address) +{ + switch (getObjectClass(address)) + { + case OCLASS_PUBLICATION: + { + /* + * We prefer to drop publications because they can be + * harmful (cause update/delete failures) and are relatively + * safe to drop. + */ + return ON_COLLISION_DROP; + } + + case OCLASS_COLLATION: + case OCLASS_PROC: + case OCLASS_TSCONFIG: + case OCLASS_TSDICT: + case OCLASS_TYPE: + default: + { + return ON_COLLISION_RENAME; + } + } +} + + /* * GenerateBackupNameForCollision calculate a backup name for a given object by its * address. This name should be used when renaming an existing object before creating the @@ -362,6 +424,64 @@ GenerateBackupNameForCollision(const ObjectAddress *address) } +/* + * CreateDropPublicationStmt creates a DROP PUBLICATION statement for the + * publication at the given address. + */ +static DropStmt * +CreateDropPublicationStmt(const ObjectAddress *address) +{ + Assert(address->classId == PublicationRelationId); + + DropStmt *dropStmt = makeNode(DropStmt); + dropStmt->removeType = OBJECT_PUBLICATION; + dropStmt->behavior = DROP_RESTRICT; + + HeapTuple publicationTuple = + SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(address->objectId)); + + if (!HeapTupleIsValid(publicationTuple)) + { + ereport(ERROR, (errmsg("cannot find publication with oid: %d", + address->objectId))); + } + + Form_pg_publication publicationForm = + (Form_pg_publication) GETSTRUCT(publicationTuple); + + char *publicationName = NameStr(publicationForm->pubname); + dropStmt->objects = list_make1(makeString(publicationName)); + + ReleaseSysCache(publicationTuple); + + return dropStmt; +} + + +/* + * CreateDropStmt returns a DROP statement for the given object. + */ +DropStmt * +CreateDropStmt(const ObjectAddress *address) +{ + switch (getObjectClass(address)) + { + case OCLASS_PUBLICATION: + { + return CreateDropPublicationStmt(address); + } + + default: + { + break; + } + } + + ereport(ERROR, (errmsg("unsupported object to construct a drop statement"), + errdetail("unable to generate a parsetree for the drop"))); +} + + /* * CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress. * The rename statement will rename the existing object on its address to the value diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index c3ec4fafb..20214921e 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -409,6 +409,24 @@ extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, u extern void AddRangeTableEntryToQueryCompat(ParseState *parseState, Relation relation); +/* publication.c - forward declarations */ +extern List * PostProcessCreatePublicationStmt(Node *node, const char *queryString); +extern List * CreatePublicationDDLCommandsIdempotent(const ObjectAddress *address); +extern char * CreatePublicationDDLCommand(Oid publicationId); +extern List * PreprocessAlterPublicationStmt(Node *stmt, const char *queryString, + ProcessUtilityContext processUtilityCtx); +extern List * GetAlterPublicationDDLCommandsForTable(Oid relationId, bool isAdd); +extern char * GetAlterPublicationTableDDLCommand(Oid publicationId, Oid relationId, + bool isAdd); +extern List * AlterPublicationOwnerStmtObjectAddress(Node *node, bool missingOk, + bool isPostProcess); +extern List * AlterPublicationStmtObjectAddress(Node *node, bool missingOk, + bool isPostProcess); +extern List * CreatePublicationStmtObjectAddress(Node *node, bool missingOk, + bool isPostProcess); +extern List * RenamePublicationStmtObjectAddress(Node *node, bool missingOk, + bool isPostProcess); + /* rename.c - forward declarations*/ extern List * PreprocessRenameStmt(Node *renameStmt, const char *renameCommand, ProcessUtilityContext processUtilityContext); @@ -657,7 +675,6 @@ extern List * PreprocessDropViewStmt(Node *node, const char *queryString, extern List * DropViewStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); extern char * CreateViewDDLCommand(Oid viewOid); extern List * GetViewCreationCommandsOfTable(Oid relationId); -extern List * GetViewCreationTableDDLCommandsOfTable(Oid relationId); extern char * AlterViewOwnerCommand(Oid viewOid); extern char * DeparseViewStmt(Node *node); extern char * DeparseDropViewStmt(Node *node); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 87704b628..0d4f605d8 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -210,6 +210,23 @@ extern char * DeparseAlterExtensionStmt(Node *stmt); /* forward declarations for deparse_database_stmts.c */ extern char * DeparseAlterDatabaseOwnerStmt(Node *node); +/* forward declaration for deparse_publication_stmts.c */ +extern char * DeparseCreatePublicationStmt(Node *stmt); +extern char * DeparseCreatePublicationStmtExtended(Node *node, + bool whereClauseNeedsTransform, + bool includeLocalTables); +extern char * DeparseAlterPublicationStmt(Node *stmt); +extern char * DeparseAlterPublicationStmtExtended(Node *stmt, + bool whereClauseNeedsTransform, + bool includeLocalTables); +extern char * DeparseAlterPublicationOwnerStmt(Node *stmt); +extern char * DeparseAlterPublicationSchemaStmt(Node *node); +extern char * DeparseDropPublicationStmt(Node *stmt); +extern char * DeparseRenamePublicationStmt(Node *node); + +extern void QualifyCreatePublicationStmt(Node *node); +extern void QualifyAlterPublicationStmt(Node *node); + /* forward declatations for deparse_text_search_stmts.c */ extern void QualifyAlterTextSearchConfigurationOwnerStmt(Node *node); extern void QualifyAlterTextSearchConfigurationSchemaStmt(Node *node); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e7cb2514d..c23a047ec 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -147,6 +147,7 @@ extern char * GetTableTypeName(Oid tableId); extern void SetCreateCitusTransactionLevel(int val); extern int GetCitusCreationLevel(void); extern bool IsCitusTable(Oid relationId); +extern bool IsCitusTableRangeVar(RangeVar *rangeVar, LOCKMODE lockMode, bool missingOk); extern bool IsCitusTableViaCatalog(Oid relationId); extern char PgDistPartitionViaCatalog(Oid relationId); extern List * LookupDistShardTuples(Oid relationId); diff --git a/src/include/distributed/worker_create_or_replace.h b/src/include/distributed/worker_create_or_replace.h index 148cee138..f0b1e8077 100644 --- a/src/include/distributed/worker_create_or_replace.h +++ b/src/include/distributed/worker_create_or_replace.h @@ -21,6 +21,7 @@ extern char * WrapCreateOrReplace(const char *sql); extern char * WrapCreateOrReplaceList(List *sqls); extern char * GenerateBackupNameForCollision(const ObjectAddress *address); +extern DropStmt * CreateDropStmt(const ObjectAddress *address); extern RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName); #endif /* WORKER_CREATE_OR_REPLACE_H */ diff --git a/src/test/regress/enterprise_schedule b/src/test/regress/enterprise_schedule index 84341d23d..55791d43a 100644 --- a/src/test/regress/enterprise_schedule +++ b/src/test/regress/enterprise_schedule @@ -19,6 +19,7 @@ test: citus_local_tables_ent test: remove_coordinator # -------- +test: publication test: logical_replication test: multi_create_table test: multi_create_table_superuser diff --git a/src/test/regress/expected/logical_replication.out b/src/test/regress/expected/logical_replication.out index 0b2585bfb..79108dd11 100644 --- a/src/test/regress/expected/logical_replication.out +++ b/src/test/regress/expected/logical_replication.out @@ -25,7 +25,9 @@ NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipp -- This allows us to test the cleanup logic at the start of the shard move. \c - - - :worker_1_port SET search_path TO logical_replication; +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_6830000; +RESET citus.enable_ddl_propagation; \c - - - :master_port SET search_path TO logical_replication; CREATE TABLE dist_6830000( @@ -155,6 +157,13 @@ SELECT count(*) from dist; 100 (1 row) +DROP PUBLICATION citus_shard_move_publication_:postgres_oid; +SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid); + pg_drop_replication_slot +--------------------------------------------------------------------- + +(1 row) + \c - - - :worker_2_port SET search_path TO logical_replication; SELECT count(*) from pg_subscription; @@ -188,3 +197,9 @@ ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE; ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE); DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid; DROP SCHEMA logical_replication CASCADE; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/metadata_sync_helpers.out b/src/test/regress/expected/metadata_sync_helpers.out index dc526afb0..ae2f9a04b 100644 --- a/src/test/regress/expected/metadata_sync_helpers.out +++ b/src/test/regress/expected/metadata_sync_helpers.out @@ -713,13 +713,16 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse - CREATE TABLE publication_test_table(id int); - CREATE PUBLICATION publication_test FOR TABLE publication_test_table; + CREATE OPERATOR === ( + LEFTARG = int, + RIGHTARG = int, + FUNCTION = int4eq + ); SET ROLE metadata_sync_helper_role; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) - AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false)) + AS (VALUES ('operator', ARRAY['===']::text[], ARRAY['int','int']::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data; -ERROR: publication object can not be distributed by Citus +ERROR: operator object can not be distributed by Citus ROLLBACK; -- Show that citus_internal_add_object_metadata checks the priviliges BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; diff --git a/src/test/regress/expected/multi_multiuser_auth.out b/src/test/regress/expected/multi_multiuser_auth.out index 15d34b563..4b7c6fcc7 100644 --- a/src/test/regress/expected/multi_multiuser_auth.out +++ b/src/test/regress/expected/multi_multiuser_auth.out @@ -22,7 +22,7 @@ SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port; worker_2_id --------------------------------------------------------------------- - 18 + 35 (1 row) \gset diff --git a/src/test/regress/expected/multi_poolinfo_usage.out b/src/test/regress/expected/multi_poolinfo_usage.out index c5e97ec95..b428409ff 100644 --- a/src/test/regress/expected/multi_poolinfo_usage.out +++ b/src/test/regress/expected/multi_poolinfo_usage.out @@ -16,7 +16,7 @@ SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port; worker_2_id --------------------------------------------------------------------- - 18 + 35 (1 row) \gset diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out new file mode 100644 index 000000000..a267cbe71 --- /dev/null +++ b/src/test/regress/expected/publication.out @@ -0,0 +1,379 @@ +CREATE SCHEMA publication; +CREATE SCHEMA "publication-1"; +SET search_path TO publication; +SET citus.shard_replication_factor TO 1; +-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently +SELECT citus_set_coordinator_host('localhost', :master_port); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; +\c - - - :worker_1_port +SET citus.enable_ddl_propagation TO off; +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; +\c - - - :worker_2_port +SET citus.enable_ddl_propagation TO off; +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; +-- create some publications with conflicting names on worker node +-- publication will be different from coordinator +CREATE PUBLICATION "pub-all"; +-- publication will be same as coordinator +CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');; +\c - - - :master_port +SET search_path TO publication; +SET citus.shard_replication_factor TO 1; +-- do not create publications on worker 2 initially +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- create a non-distributed publication +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete'); +RESET citus.enable_ddl_propagation; +ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate'); +-- create regular, distributed publications +CREATE PUBLICATION pubempty; +CREATE PUBLICATION pubinsertonly WITH (publish = 'insert'); +CREATE PUBLICATION "pub-all" FOR ALL TABLES; +CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update'); +-- add worker 2 with publications +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- Check publications on all the nodes, if we see the same publication name twice then its definition differs +-- Note that publications are special in the sense that the coordinator object might differ from +-- worker objects due to the presence of regular tables. +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); + SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update'')'); + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubempty WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubinsertonly WITH (publish_via_partition_root = ''false'', publish = ''insert'')'); +(4 rows) + +CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml); +CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int); +CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int); +-- various operations on a publication with only local tables +CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate'); +ALTER PUBLICATION pubtables_orig DROP TABLE test; +ALTER PUBLICATION pubtables_orig ADD TABLE test; +-- publication will be empty on worker nodes, since all tables are local +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables_orig WITH (publish_via_partition_root = ''false'', publish = ''insert, truncate'')'); +(1 row) + +-- distribute a table, creating a mixed publication +SELECT create_distributed_table('test','x', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- some generic operations +ALTER PUBLICATION pubtables_orig RENAME TO pubtables; +ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete'); +ALTER PUBLICATION pubtables OWNER TO postgres; +ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete'); +ERROR: unrecognized value for publication option "publish": "inert" +ALTER PUBLICATION pubtables ADD TABLE notexist; +ERROR: relation "notexist" does not exist +-- operations with a distributed table +ALTER PUBLICATION pubtables DROP TABLE test; +ALTER PUBLICATION pubtables ADD TABLE test; +ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs"; +-- operations with a local table in a mixed publication +ALTER PUBLICATION pubtables DROP TABLE "test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "test-pubs"; +SELECT create_distributed_table('"test-pubs"', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- test and test-pubs will show up in worker nodes +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, TABLE publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete'')'); +(1 row) + +-- operations with a strangely named distributed table in a mixed publication +ALTER PUBLICATION pubtables DROP TABLE "test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "test-pubs"; +-- create a publication with distributed and local tables +DROP PUBLICATION pubtables; +CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs"; +-- change distributed tables +SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true); +NOTICE: creating a new table for publication.test +NOTICE: moving the data of publication.test +NOTICE: dropping the old publication.test +NOTICE: renaming the new table to publication.test +NOTICE: creating a new table for publication."test-pubs" +NOTICE: moving the data of publication."test-pubs" +NOTICE: dropping the old publication."test-pubs" +NOTICE: renaming the new table to publication."test-pubs" + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('test'); +NOTICE: creating a new table for publication.test +NOTICE: moving the data of publication.test +NOTICE: dropping the old publication.test +NOTICE: renaming the new table to publication.test + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('test'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table_concurrently('test', 'x'); + create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('"test-pubs"'); +NOTICE: creating a new table for publication."test-pubs" +NOTICE: moving the data of publication."test-pubs" +NOTICE: dropping the old publication."test-pubs" +NOTICE: renaming the new table to publication."test-pubs" + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('"test-pubs"'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- publications are unchanged despite various tranformations +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, TABLE publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- partitioned table +CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a); +CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10); +ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey; +CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20); +ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a); +ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey; +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true'); +SELECT create_distributed_table('testpub_partitioned', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +DROP PUBLICATION pubpartitioned; +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true'); +-- add a partition +ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; +ERROR: malformed array literal: "" +DETAIL: Array value must start with "{" or dimension information. +-- make sure we can sync all the publication metadata +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +DROP PUBLICATION pubempty; +DROP PUBLICATION pubtables; +DROP PUBLICATION pubinsertonly; +DROP PUBLICATION "pub-all-insertupdateonly"; +DROP PUBLICATION "pub-all"; +DROP PUBLICATION pubpartitioned; +DROP PUBLICATION pubnotdistributed; +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +SET client_min_messages TO ERROR; +DROP SCHEMA publication CASCADE; +DROP SCHEMA "publication-1" CASCADE; +SELECT citus_remove_node('localhost', :master_port); +\q +\endif +-- recreate a mixed publication +CREATE PUBLICATION pubtables FOR TABLE test, "publication-1"."test-pubs"; +-- operations on an existing distributed table +ALTER PUBLICATION pubtables DROP TABLE test; +ALTER PUBLICATION pubtables ADD TABLE test (y); +ALTER PUBLICATION pubtables SET TABLE test WHERE (doc IS DOCUMENT); +ALTER PUBLICATION pubtables SET TABLE test WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE doc)); +ALTER PUBLICATION pubtables SET TABLE test WHERE (CASE x WHEN 5 THEN true ELSE false END); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test WHERE (CASE test.x WHEN 5 THEN true ELSE false END) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +ALTER PUBLICATION pubtables SET TABLE test ("column-1", x) WHERE (x > "column-1"), "publication-1"."test-pubs"; +-- operations on a local table +ALTER PUBLICATION pubtables DROP TABLE "publication-1"."test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "publication-1"."test-pubs" (y); +-- mixed operations +ALTER PUBLICATION pubtables SET TABLE test, TABLES IN SCHEMA "publication-1", TABLES IN SCHEMA current_schema; +ALTER PUBLICATION pubtables SET TABLE "publication-1"."test-pubs", test ("column-1", x) WHERE (x > "column-1"); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test (x, "column-1") WHERE ((test.x > test."column-1")) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- publication with schemas +CREATE PUBLICATION "pub-mix" FOR TABLE test, TABLES IN SCHEMA current_schema, TABLE "publication-1"."test-pubs", TABLES IN SCHEMA "publication-1"; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pub-mix%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-mix" FOR TABLES IN SCHEMA publication, TABLES IN SCHEMA "publication-1", TABLE publication.test WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- publication on a partitioned table +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned (a, b) WITH (publish_via_partition_root = 'true'); +ALTER PUBLICATION pubpartitioned SET (publish_via_partition_root = 1); +SELECT alter_distributed_table('testpub_partitioned', shard_count := 6, cascade_to_colocated := true); +NOTICE: converting the partitions of publication.testpub_partitioned +NOTICE: creating a new table for publication.testpub_partitioned_0 +NOTICE: moving the data of publication.testpub_partitioned_0 +NOTICE: dropping the old publication.testpub_partitioned_0 +NOTICE: renaming the new table to publication.testpub_partitioned_0 +NOTICE: creating a new table for publication.testpub_partitioned_1 +NOTICE: moving the data of publication.testpub_partitioned_1 +NOTICE: dropping the old publication.testpub_partitioned_1 +NOTICE: renaming the new table to publication.testpub_partitioned_1 +NOTICE: creating a new table for publication.testpub_partitioned +NOTICE: dropping the old publication.testpub_partitioned +NOTICE: renaming the new table to publication.testpub_partitioned +NOTICE: creating a new table for publication.test +NOTICE: moving the data of publication.test +NOTICE: dropping the old publication.test +NOTICE: renaming the new table to publication.test + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned (a, b) WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- make sure we propagate schema dependencies +SET citus.create_object_propagation TO 'deferred'; +BEGIN; +CREATE SCHEMA deptest; +END; +CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest; +RESET citus.create_object_propagation; +DROP SCHEMA deptest CASCADE; +-- make sure we can sync all the publication metadata +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +DROP PUBLICATION pubdep; +DROP PUBLICATION "pub-mix"; +DROP PUBLICATION pubtables; +DROP PUBLICATION pubpartitioned; +SET client_min_messages TO ERROR; +DROP SCHEMA publication CASCADE; +DROP SCHEMA "publication-1" CASCADE; +SELECT citus_remove_node('localhost', :master_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/publication_0.out b/src/test/regress/expected/publication_0.out new file mode 100644 index 000000000..617950a76 --- /dev/null +++ b/src/test/regress/expected/publication_0.out @@ -0,0 +1,273 @@ +CREATE SCHEMA publication; +CREATE SCHEMA "publication-1"; +SET search_path TO publication; +SET citus.shard_replication_factor TO 1; +-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently +SELECT citus_set_coordinator_host('localhost', :master_port); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; +\c - - - :worker_1_port +SET citus.enable_ddl_propagation TO off; +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; +\c - - - :worker_2_port +SET citus.enable_ddl_propagation TO off; +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; +-- create some publications with conflicting names on worker node +-- publication will be different from coordinator +CREATE PUBLICATION "pub-all"; +-- publication will be same as coordinator +CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');; +\c - - - :master_port +SET search_path TO publication; +SET citus.shard_replication_factor TO 1; +-- do not create publications on worker 2 initially +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- create a non-distributed publication +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete'); +RESET citus.enable_ddl_propagation; +ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate'); +-- create regular, distributed publications +CREATE PUBLICATION pubempty; +CREATE PUBLICATION pubinsertonly WITH (publish = 'insert'); +CREATE PUBLICATION "pub-all" FOR ALL TABLES; +CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update'); +-- add worker 2 with publications +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- Check publications on all the nodes, if we see the same publication name twice then its definition differs +-- Note that publications are special in the sense that the coordinator object might differ from +-- worker objects due to the presence of regular tables. +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); + SELECT worker_create_or_replace_object('CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish_via_partition_root = ''false'', publish = ''insert, update'')'); + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubempty WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubinsertonly WITH (publish_via_partition_root = ''false'', publish = ''insert'')'); +(4 rows) + +CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml); +CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int); +CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int); +-- various operations on a publication with only local tables +CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate'); +ALTER PUBLICATION pubtables_orig DROP TABLE test; +ALTER PUBLICATION pubtables_orig ADD TABLE test; +-- publication will be empty on worker nodes, since all tables are local +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables_orig WITH (publish_via_partition_root = ''false'', publish = ''insert, truncate'')'); +(1 row) + +-- distribute a table, creating a mixed publication +SELECT create_distributed_table('test','x', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- some generic operations +ALTER PUBLICATION pubtables_orig RENAME TO pubtables; +ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete'); +ALTER PUBLICATION pubtables OWNER TO postgres; +ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete'); +ERROR: unrecognized "publish" value: "inert" +ALTER PUBLICATION pubtables ADD TABLE notexist; +ERROR: relation "notexist" does not exist +-- operations with a distributed table +ALTER PUBLICATION pubtables DROP TABLE test; +ALTER PUBLICATION pubtables ADD TABLE test; +ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs"; +-- operations with a local table in a mixed publication +ALTER PUBLICATION pubtables DROP TABLE "test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "test-pubs"; +SELECT create_distributed_table('"test-pubs"', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- test and test-pubs will show up in worker nodes +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete'')'); +(1 row) + +-- operations with a strangely named distributed table in a mixed publication +ALTER PUBLICATION pubtables DROP TABLE "test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "test-pubs"; +-- create a publication with distributed and local tables +DROP PUBLICATION pubtables; +CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs"; +-- change distributed tables +SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true); +NOTICE: creating a new table for publication.test +NOTICE: moving the data of publication.test +NOTICE: dropping the old publication.test +NOTICE: renaming the new table to publication.test +NOTICE: creating a new table for publication."test-pubs" +NOTICE: moving the data of publication."test-pubs" +NOTICE: dropping the old publication."test-pubs" +NOTICE: renaming the new table to publication."test-pubs" + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('test'); +NOTICE: creating a new table for publication.test +NOTICE: moving the data of publication.test +NOTICE: dropping the old publication.test +NOTICE: renaming the new table to publication.test + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('test'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table_concurrently('test', 'x'); + create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('"test-pubs"'); +NOTICE: creating a new table for publication."test-pubs" +NOTICE: moving the data of publication."test-pubs" +NOTICE: dropping the old publication."test-pubs" +NOTICE: renaming the new table to publication."test-pubs" + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('"test-pubs"'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- publications are unchanged despite various tranformations +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubtables FOR TABLE publication.test, publication."test-pubs" WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- partitioned table +CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a); +CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10); +ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey; +CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20); +ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a); +ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey; +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true'); +SELECT create_distributed_table('testpub_partitioned', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION pubpartitioned FOR TABLE publication.testpub_partitioned WITH (publish_via_partition_root = ''true'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +DROP PUBLICATION pubpartitioned; +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true'); +-- add a partition +ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; +ERROR: malformed array literal: "" +DETAIL: Array value must start with "{" or dimension information. +-- make sure we can sync all the publication metadata +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +DROP PUBLICATION pubempty; +DROP PUBLICATION pubtables; +DROP PUBLICATION pubinsertonly; +DROP PUBLICATION "pub-all-insertupdateonly"; +DROP PUBLICATION "pub-all"; +DROP PUBLICATION pubpartitioned; +DROP PUBLICATION pubnotdistributed; +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +SET client_min_messages TO ERROR; +DROP SCHEMA publication CASCADE; +DROP SCHEMA "publication-1" CASCADE; +SELECT citus_remove_node('localhost', :master_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +\q diff --git a/src/test/regress/expected/split_shard.out b/src/test/regress/expected/split_shard.out index 069ff306f..7186b27d2 100644 --- a/src/test/regress/expected/split_shard.out +++ b/src/test/regress/expected/split_shard.out @@ -61,7 +61,9 @@ SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info @@ -261,7 +263,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info @@ -428,7 +432,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; -- Create publication at worker1 +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; +RESET citus.enable_ddl_propagation; -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, @@ -597,8 +603,10 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 \c - postgres - :worker_1_port SET search_path TO split_shard_replication_setup_schema; +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; +RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index f155aaa49..e78b0a393 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -21,7 +21,9 @@ SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0); -- This allows us to test the cleanup logic at the start of the shard move. \c - - - :worker_1_port SET search_path TO logical_replication; +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_6830000; +RESET citus.enable_ddl_propagation; \c - - - :master_port SET search_path TO logical_replication; @@ -72,6 +74,9 @@ SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; SELECT count(*) from dist; +DROP PUBLICATION citus_shard_move_publication_:postgres_oid; +SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid); + \c - - - :worker_2_port SET search_path TO logical_replication; @@ -88,3 +93,4 @@ ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE; ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE); DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid; DROP SCHEMA logical_replication CASCADE; +SELECT public.wait_for_resource_cleanup(); diff --git a/src/test/regress/sql/metadata_sync_helpers.sql b/src/test/regress/sql/metadata_sync_helpers.sql index af4bc9247..856ec0bfb 100644 --- a/src/test/regress/sql/metadata_sync_helpers.sql +++ b/src/test/regress/sql/metadata_sync_helpers.sql @@ -429,12 +429,15 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse - CREATE TABLE publication_test_table(id int); - CREATE PUBLICATION publication_test FOR TABLE publication_test_table; + CREATE OPERATOR === ( + LEFTARG = int, + RIGHTARG = int, + FUNCTION = int4eq + ); SET ROLE metadata_sync_helper_role; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) - AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false)) + AS (VALUES ('operator', ARRAY['===']::text[], ARRAY['int','int']::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data; ROLLBACK; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql new file mode 100644 index 000000000..3fd6128b8 --- /dev/null +++ b/src/test/regress/sql/publication.sql @@ -0,0 +1,269 @@ +CREATE SCHEMA publication; +CREATE SCHEMA "publication-1"; +SET search_path TO publication; +SET citus.shard_replication_factor TO 1; + +-- for citus_add_local_table_to_metadata / create_distributed_table_concurrently +SELECT citus_set_coordinator_host('localhost', :master_port); + +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; + +\c - - - :worker_1_port +SET citus.enable_ddl_propagation TO off; + +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; + +\c - - - :worker_2_port +SET citus.enable_ddl_propagation TO off; + +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION activate_node_snapshot() + IS 'commands to activate node snapshot'; + +-- create some publications with conflicting names on worker node + +-- publication will be different from coordinator +CREATE PUBLICATION "pub-all"; +-- publication will be same as coordinator +CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update');; + +\c - - - :master_port +SET search_path TO publication; +SET citus.shard_replication_factor TO 1; + +-- do not create publications on worker 2 initially +SELECT citus_remove_node('localhost', :worker_2_port); + +-- create a non-distributed publication +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pubnotdistributed WITH (publish = 'delete'); +RESET citus.enable_ddl_propagation; +ALTER PUBLICATION pubnotdistributed SET (publish = 'truncate'); + +-- create regular, distributed publications +CREATE PUBLICATION pubempty; +CREATE PUBLICATION pubinsertonly WITH (publish = 'insert'); +CREATE PUBLICATION "pub-all" FOR ALL TABLES; +CREATE PUBLICATION "pub-all-insertupdateonly" FOR ALL TABLES WITH (publish = 'insert, update'); + +-- add worker 2 with publications +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +-- Check publications on all the nodes, if we see the same publication name twice then its definition differs +-- Note that publications are special in the sense that the coordinator object might differ from +-- worker objects due to the presence of regular tables. +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' ORDER BY 1) s$$) + ORDER BY c) s; + +CREATE TABLE test (x int primary key, y int, "column-1" int, doc xml); +CREATE TABLE "test-pubs" (x int primary key, y int, "column-1" int); +CREATE TABLE "publication-1"."test-pubs" (x int primary key, y int, "column-1" int); + +-- various operations on a publication with only local tables +CREATE PUBLICATION pubtables_orig FOR TABLE test, "test-pubs", "publication-1"."test-pubs" WITH (publish = 'insert, truncate'); +ALTER PUBLICATION pubtables_orig DROP TABLE test; +ALTER PUBLICATION pubtables_orig ADD TABLE test; + +-- publication will be empty on worker nodes, since all tables are local +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- distribute a table, creating a mixed publication +SELECT create_distributed_table('test','x', colocate_with := 'none'); + +-- some generic operations +ALTER PUBLICATION pubtables_orig RENAME TO pubtables; +ALTER PUBLICATION pubtables SET (publish = 'insert, update, delete'); +ALTER PUBLICATION pubtables OWNER TO postgres; +ALTER PUBLICATION pubtables SET (publish = 'inert, update, delete'); +ALTER PUBLICATION pubtables ADD TABLE notexist; + +-- operations with a distributed table +ALTER PUBLICATION pubtables DROP TABLE test; +ALTER PUBLICATION pubtables ADD TABLE test; +ALTER PUBLICATION pubtables SET TABLE test, "test-pubs", "publication-1"."test-pubs"; + +-- operations with a local table in a mixed publication +ALTER PUBLICATION pubtables DROP TABLE "test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "test-pubs"; + +SELECT create_distributed_table('"test-pubs"', 'x'); + +-- test and test-pubs will show up in worker nodes +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- operations with a strangely named distributed table in a mixed publication +ALTER PUBLICATION pubtables DROP TABLE "test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "test-pubs"; + +-- create a publication with distributed and local tables +DROP PUBLICATION pubtables; +CREATE PUBLICATION pubtables FOR TABLE test, "test-pubs", "publication-1"."test-pubs"; + +-- change distributed tables +SELECT alter_distributed_table('test', shard_count := 5, cascade_to_colocated := true); +SELECT undistribute_table('test'); +SELECT citus_add_local_table_to_metadata('test'); +SELECT create_distributed_table_concurrently('test', 'x'); +SELECT undistribute_table('"test-pubs"'); +SELECT create_reference_table('"test-pubs"'); + +-- publications are unchanged despite various tranformations +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- partitioned table +CREATE TABLE testpub_partitioned (a int, b text, c text) PARTITION BY RANGE (a); +CREATE TABLE testpub_partitioned_0 PARTITION OF testpub_partitioned FOR VALUES FROM (1) TO (10); +ALTER TABLE testpub_partitioned_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_partitioned_0 REPLICA IDENTITY USING INDEX testpub_partitioned_0_pkey; +CREATE TABLE testpub_partitioned_1 PARTITION OF testpub_partitioned FOR VALUES FROM (11) TO (20); +ALTER TABLE testpub_partitioned_1 ADD PRIMARY KEY (a); +ALTER TABLE testpub_partitioned_1 REPLICA IDENTITY USING INDEX testpub_partitioned_1_pkey; +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true'); + +SELECT create_distributed_table('testpub_partitioned', 'a'); + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; + +DROP PUBLICATION pubpartitioned; +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned WITH (publish_via_partition_root = 'true'); + +-- add a partition +ALTER PUBLICATION pubpartitioned ADD TABLE testpub_partitioned_1; + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLIATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- make sure we can sync all the publication metadata +SELECT start_metadata_sync_to_all_nodes(); + +DROP PUBLICATION pubempty; +DROP PUBLICATION pubtables; +DROP PUBLICATION pubinsertonly; +DROP PUBLICATION "pub-all-insertupdateonly"; +DROP PUBLICATION "pub-all"; +DROP PUBLICATION pubpartitioned; +DROP PUBLICATION pubnotdistributed; + +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +SET client_min_messages TO ERROR; +DROP SCHEMA publication CASCADE; +DROP SCHEMA "publication-1" CASCADE; + +SELECT citus_remove_node('localhost', :master_port); +\q +\endif + +-- recreate a mixed publication +CREATE PUBLICATION pubtables FOR TABLE test, "publication-1"."test-pubs"; + +-- operations on an existing distributed table +ALTER PUBLICATION pubtables DROP TABLE test; +ALTER PUBLICATION pubtables ADD TABLE test (y); +ALTER PUBLICATION pubtables SET TABLE test WHERE (doc IS DOCUMENT); +ALTER PUBLICATION pubtables SET TABLE test WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE doc)); +ALTER PUBLICATION pubtables SET TABLE test WHERE (CASE x WHEN 5 THEN true ELSE false END); + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + +ALTER PUBLICATION pubtables SET TABLE test ("column-1", x) WHERE (x > "column-1"), "publication-1"."test-pubs"; + +-- operations on a local table +ALTER PUBLICATION pubtables DROP TABLE "publication-1"."test-pubs"; +ALTER PUBLICATION pubtables ADD TABLE "publication-1"."test-pubs" (y); + +-- mixed operations +ALTER PUBLICATION pubtables SET TABLE test, TABLES IN SCHEMA "publication-1", TABLES IN SCHEMA current_schema; +ALTER PUBLICATION pubtables SET TABLE "publication-1"."test-pubs", test ("column-1", x) WHERE (x > "column-1"); + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubtables%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- publication with schemas +CREATE PUBLICATION "pub-mix" FOR TABLE test, TABLES IN SCHEMA current_schema, TABLE "publication-1"."test-pubs", TABLES IN SCHEMA "publication-1"; + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pub-mix%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- publication on a partitioned table +CREATE PUBLICATION pubpartitioned FOR TABLE testpub_partitioned (a, b) WITH (publish_via_partition_root = 'true'); +ALTER PUBLICATION pubpartitioned SET (publish_via_partition_root = 1); + +SELECT alter_distributed_table('testpub_partitioned', shard_count := 6, cascade_to_colocated := true); + +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%pubpartitioned%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- make sure we propagate schema dependencies +SET citus.create_object_propagation TO 'deferred'; +BEGIN; +CREATE SCHEMA deptest; +END; +CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest; +RESET citus.create_object_propagation; +DROP SCHEMA deptest CASCADE; + +-- make sure we can sync all the publication metadata +SELECT start_metadata_sync_to_all_nodes(); + +DROP PUBLICATION pubdep; +DROP PUBLICATION "pub-mix"; +DROP PUBLICATION pubtables; +DROP PUBLICATION pubpartitioned; + +SET client_min_messages TO ERROR; +DROP SCHEMA publication CASCADE; +DROP SCHEMA "publication-1" CASCADE; + +SELECT citus_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/split_shard.sql b/src/test/regress/sql/split_shard.sql index f7c105076..1e601fb4f 100644 --- a/src/test/regress/sql/split_shard.sql +++ b/src/test/regress/sql/split_shard.sql @@ -64,7 +64,9 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, @@ -176,7 +178,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, @@ -282,7 +286,9 @@ SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; -- Create publication at worker1 +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; +RESET citus.enable_ddl_propagation; -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ @@ -401,8 +407,10 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 \c - postgres - :worker_1_port SET search_path TO split_shard_replication_setup_schema; +SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; +RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,