diff --git a/.circleci/config.yml b/.circleci/config.yml index d269739f2..a1c0f1553 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ orbs: parameters: image_suffix: type: string - default: '-v0c8d80c' + default: '-v641cdcd' pg14_version: type: string default: '14.9' @@ -15,10 +15,10 @@ parameters: default: '15.4' pg16_version: type: string - default: '16beta3' + default: '16rc1' upgrade_pg_versions: type: string - default: '14.9-15.4-16beta3' + default: '14.9-15.4-16rc1' style_checker_tools_version: type: string default: '0.8.18' diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 788a3b8b0..fbe7cfe07 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -53,6 +53,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/namespace_utils.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/replication_origin_session_utils.h" @@ -1764,10 +1765,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. */ - OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); /* * Push the transaction snapshot to be able to get vief definition with pg_get_viewdef @@ -1779,7 +1777,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) char *viewDefinition = TextDatumGetCString(viewDefinitionDatum); PopActiveSnapshot(); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(query, "AS %s", viewDefinition); diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index ebc69d19b..c713ce099 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1478,11 +1478,20 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, static void FinalizeCitusLocalTableCreation(Oid relationId) { +#if PG_VERSION_NUM >= PG_VERSION_16 + + /* + * PG16+ supports truncate triggers on foreign tables + */ + if (RegularTable(relationId) || IsForeignTable(relationId)) +#else + /* * If it is a foreign table, then skip creating citus truncate trigger * as foreign tables do not support truncate triggers. */ if (RegularTable(relationId)) +#endif { CreateTruncateTrigger(relationId); } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3b993250f..dc06692b3 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1256,8 +1256,17 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, colocationId, citusTableParams.replicationModel, autoConverted); +#if PG_VERSION_NUM >= PG_VERSION_16 + + /* + * PG16+ supports truncate triggers on foreign tables + */ + if (RegularTable(relationId) || IsForeignTable(relationId)) +#else + /* foreign tables do not support TRUNCATE trigger */ if (RegularTable(relationId)) +#endif { CreateTruncateTrigger(relationId); } @@ -1659,6 +1668,7 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId) ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*tableAddress, RelationRelationId, relationId); EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress)); + TrackPropagatedTableAndSequences(relationId); } diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index ceec83324..977efb145 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -112,15 +112,35 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) dependency->objectSubId, ExclusiveLock); } - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - const char *nodeName = workerNode->workerName; - uint32 nodePort = workerNode->workerPort; - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), - ddlCommands); + /* + * We need to propagate dependencies via the current user's metadata connection if + * any dependency for the target is created in the current transaction. Our assumption + * is that if we rely on a dependency created in the current transaction, then the + * current user, most probably, has permissions to create the target object as well. + * Note that, user still may not be able to create the target due to no permissions + * for any of its dependencies. But this is ok since it should be rare. + * + * If we opted to use a separate superuser connection for the target, then we would + * have visibility issues since propagated dependencies would be invisible to + * the separate connection until we locally commit. + */ + if (HasAnyDependencyInPropagatedObjects(target)) + { + SendCommandListToWorkersWithMetadata(ddlCommands); + } + else + { + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + const char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), + ddlCommands); + } } /* diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index ac4bf135e..5bddf1ede 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -50,7 +50,7 @@ static List * GetAllViews(void); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static Node * RecreateExtensionStmt(Oid extensionOid); -static List * GenerateGrantCommandsOnExtesionDependentFDWs(Oid extensionId); +static List * GenerateGrantCommandsOnExtensionDependentFDWs(Oid extensionId); /* @@ -985,7 +985,7 @@ CreateExtensionDDLCommand(const ObjectAddress *extensionAddress) /* any privilege granted on FDWs that belong to the extension should be included */ List *FDWGrants = - GenerateGrantCommandsOnExtesionDependentFDWs(extensionAddress->objectId); + GenerateGrantCommandsOnExtensionDependentFDWs(extensionAddress->objectId); ddlCommands = list_concat(ddlCommands, FDWGrants); @@ -1048,11 +1048,11 @@ RecreateExtensionStmt(Oid extensionOid) /* - * GenerateGrantCommandsOnExtesionDependentFDWs returns a list of commands that GRANTs + * GenerateGrantCommandsOnExtensionDependentFDWs returns a list of commands that GRANTs * the privileges on FDWs that are depending on the given extension. */ static List * -GenerateGrantCommandsOnExtesionDependentFDWs(Oid extensionId) +GenerateGrantCommandsOnExtensionDependentFDWs(Oid extensionId) { List *commands = NIL; List *FDWOids = GetDependentFDWsToExtension(extensionId); diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index 0d5156353..7c2d50f44 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -895,7 +895,7 @@ GetForeignConstraintCommandsInternal(Oid relationId, int flags) List *foreignKeyCommands = NIL; - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); Oid foreignKeyOid = InvalidOid; foreach_oid(foreignKeyOid, foreignKeyOids) @@ -906,7 +906,7 @@ GetForeignConstraintCommandsInternal(Oid relationId, int flags) } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return foreignKeyCommands; } diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 76112ad9a..01911677d 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -909,15 +909,14 @@ GetFunctionDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace) else { Datum sqlTextDatum = (Datum) 0; - - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); sqlTextDatum = DirectFunctionCall1(pg_get_functiondef, ObjectIdGetDatum(funcOid)); createFunctionSQL = TextDatumGetCString(sqlTextDatum); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } return createFunctionSQL; diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index e5d5ac8ce..dae72ada9 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -530,7 +530,7 @@ GetExplicitStatisticsCommandList(Oid relationId) RelationClose(relation); /* generate fully-qualified names */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); Oid statisticsId = InvalidOid; foreach_oid(statisticsId, statisticsIdList) @@ -579,7 +579,7 @@ GetExplicitStatisticsCommandList(Oid relationId) } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return explicitStatisticsCommandList; } diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index c7776bdb6..7577dfd31 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -74,7 +74,7 @@ GetExplicitTriggerCommandList(Oid relationId) { List *createTriggerCommandList = NIL; - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); List *triggerIdList = GetExplicitTriggerIdList(relationId); @@ -116,7 +116,7 @@ GetExplicitTriggerCommandList(Oid relationId) } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return createTriggerCommandList; } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 888b3dfed..10e424623 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -77,6 +77,7 @@ #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt( parsetree); + if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt) { ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); @@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt, PreprocessCreateExtensionStmtForCitusColumnar(parsetree); } + if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree)) + { + /* + * Citus maintains a higher level cache. We use the cache invalidation mechanism + * of Postgres to achieve cache coherency between backends. Any change to citus + * extension should be made known to other backends. We do this by invalidating the + * relcache and therefore invoking the citus registered callback that invalidates + * the citus cache in other backends. + */ + CacheInvalidateRelcacheAll(); + } + /* * Make sure that on DROP DATABASE we terminate the background daemon * associated with it. @@ -923,18 +937,10 @@ ProcessUtilityInternal(PlannedStmt *pstmt, foreach_ptr(address, addresses) { MarkObjectDistributed(address); + TrackPropagatedObject(address); } } } - - if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt)) - { - /* - * Ensure value is valid, we can't do some checks during CREATE - * EXTENSION. This is important to register some invalidation callbacks. - */ - CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */ - } } diff --git a/src/backend/distributed/commands/view.c b/src/backend/distributed/commands/view.c index 02d6815d9..7c4816144 100644 --- a/src/backend/distributed/commands/view.c +++ b/src/backend/distributed/commands/view.c @@ -479,10 +479,7 @@ AppendViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid) * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. */ - OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); /* * Push the transaction snapshot to be able to get vief definition with pg_get_viewdef @@ -494,7 +491,7 @@ AppendViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid) char *viewDefinition = TextDatumGetCString(viewDefinitionDatum); PopActiveSnapshot(); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, "AS %s ", viewDefinition); } diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 6b865e061..220ea3ec7 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -818,7 +818,7 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, * Switch to empty search_path to deparse_index_columns to produce fully- * qualified names in expressions. */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); /* index column or expression list begins here */ appendStringInfoChar(buffer, '('); @@ -855,7 +855,7 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } diff --git a/src/backend/distributed/deparser/deparse_domain_stmts.c b/src/backend/distributed/deparser/deparse_domain_stmts.c index 9891e0532..e517074ec 100644 --- a/src/backend/distributed/deparser/deparse_domain_stmts.c +++ b/src/backend/distributed/deparser/deparse_domain_stmts.c @@ -345,9 +345,9 @@ AppendAlterDomainStmtSetDefault(StringInfo buf, AlterDomainStmt *stmt) expr = TransformDefaultExpr(expr, stmt->typeName, baseTypeName); /* deparse while the searchpath is cleared to force qualification of identifiers */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *exprSql = deparse_expression(expr, NIL, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, "SET DEFAULT %s", exprSql); } @@ -443,9 +443,9 @@ AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName, elog(ERROR, "missing expression for domain constraint"); } - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *exprSql = deparse_expression(expr, NIL, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, " CHECK (%s)", exprSql); return; @@ -469,9 +469,9 @@ AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName, elog(ERROR, "missing expression for domain default"); } - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *exprSql = deparse_expression(expr, NIL, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, " DEFAULT %s", exprSql); return; diff --git a/src/backend/distributed/deparser/deparse_publication_stmts.c b/src/backend/distributed/deparser/deparse_publication_stmts.c index deb8e7285..e22333146 100644 --- a/src/backend/distributed/deparser/deparse_publication_stmts.c +++ b/src/backend/distributed/deparser/deparse_publication_stmts.c @@ -307,11 +307,11 @@ AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName, List *relationContext = deparse_context_for(tableName->relname, relation->rd_id); - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *whereClauseString = deparse_expression(whereClause, relationContext, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfoString(buf, whereClauseString); diff --git a/src/backend/distributed/deparser/deparse_table_stmts.c b/src/backend/distributed/deparser/deparse_table_stmts.c index a90d38655..ff96d7fc3 100644 --- a/src/backend/distributed/deparser/deparse_table_stmts.c +++ b/src/backend/distributed/deparser/deparse_table_stmts.c @@ -562,9 +562,9 @@ DeparseRawExprForColumnDefault(Oid relationId, Oid columnTypeId, int32 columnTyp List *deparseContext = deparse_context_for(get_rel_name(relationId), relationId); - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *defaultExprStr = deparse_expression(defaultExpr, deparseContext, false, false); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); RelationClose(relation); diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c index b364221d8..6ab124537 100644 --- a/src/backend/distributed/deparser/ruleutils_14.c +++ b/src/backend/distributed/deparser/ruleutils_14.c @@ -53,6 +53,7 @@ #include "common/keywords.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" +#include "distributed/namespace_utils.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -610,18 +611,14 @@ pg_get_rule_expr(Node *expression) { bool showImplicitCasts = true; deparse_context context; - OverrideSearchPath *overridePath = NULL; StringInfo buffer = makeStringInfo(); /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buffer; context.namespaces = NIL; @@ -638,7 +635,7 @@ pg_get_rule_expr(Node *expression) get_rule_expr(expression, &context, showImplicitCasts); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return buffer->data; } @@ -1955,8 +1952,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, deparse_context context; deparse_namespace dpns; - OverrideSearchPath *overridePath = NULL; - /* Guard against excessively long or deeply-nested queries */ CHECK_FOR_INTERRUPTS(); check_stack_depth(); @@ -1975,12 +1970,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buf; context.namespaces = lcons(&dpns, list_copy(parentnamespace)); @@ -2031,7 +2023,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } /* ---------- diff --git a/src/backend/distributed/deparser/ruleutils_15.c b/src/backend/distributed/deparser/ruleutils_15.c index 2dded9b01..755e0f4cd 100644 --- a/src/backend/distributed/deparser/ruleutils_15.c +++ b/src/backend/distributed/deparser/ruleutils_15.c @@ -54,6 +54,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/multi_router_planner.h" +#include "distributed/namespace_utils.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -624,18 +625,14 @@ pg_get_rule_expr(Node *expression) { bool showImplicitCasts = true; deparse_context context; - OverrideSearchPath *overridePath = NULL; StringInfo buffer = makeStringInfo(); /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(), since we set addCatalog to true; */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buffer; context.namespaces = NIL; @@ -652,7 +649,7 @@ pg_get_rule_expr(Node *expression) get_rule_expr(expression, &context, showImplicitCasts); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return buffer->data; } @@ -2038,8 +2035,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, deparse_context context; deparse_namespace dpns; - OverrideSearchPath *overridePath = NULL; - /* Guard against excessively long or deeply-nested queries */ CHECK_FOR_INTERRUPTS(); check_stack_depth(); @@ -2058,12 +2053,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buf; context.namespaces = lcons(&dpns, list_copy(parentnamespace)); @@ -2118,7 +2110,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } /* ---------- diff --git a/src/backend/distributed/deparser/ruleutils_16.c b/src/backend/distributed/deparser/ruleutils_16.c index 37ba6e34b..31e8823b1 100644 --- a/src/backend/distributed/deparser/ruleutils_16.c +++ b/src/backend/distributed/deparser/ruleutils_16.c @@ -54,6 +54,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/multi_router_planner.h" +#include "distributed/namespace_utils.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -641,18 +642,14 @@ pg_get_rule_expr(Node *expression) { bool showImplicitCasts = true; deparse_context context; - OverrideSearchPath *overridePath = NULL; StringInfo buffer = makeStringInfo(); /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buffer; context.namespaces = NIL; @@ -669,7 +666,7 @@ pg_get_rule_expr(Node *expression) get_rule_expr(expression, &context, showImplicitCasts); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return buffer->data; } @@ -2052,8 +2049,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, deparse_context context; deparse_namespace dpns; - OverrideSearchPath *overridePath = NULL; - /* Guard against excessively long or deeply-nested queries */ CHECK_FOR_INTERRUPTS(); check_stack_depth(); @@ -2072,12 +2067,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buf; context.namespaces = lcons(&dpns, list_copy(parentnamespace)); @@ -2132,7 +2124,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } /* ---------- diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 5ccd4a512..55d0f11c5 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -133,6 +133,19 @@ typedef struct ShardIdCacheEntry int shardIndex; } ShardIdCacheEntry; +/* + * ExtensionCreatedState is used to track if citus extension has been created + * using CREATE EXTENSION command. + * UNKNOWN : MetadataCache is invalid. State is UNKNOWN. + * CREATED : Citus is created. + * NOTCREATED : Citus is not created. + */ +typedef enum ExtensionCreatedState +{ + UNKNOWN = 0, + CREATED = 1, + NOTCREATED = 2, +} ExtensionCreatedState; /* * State which should be cleared upon DROP EXTENSION. When the configuration @@ -140,7 +153,7 @@ typedef struct ShardIdCacheEntry */ typedef struct MetadataCacheData { - bool extensionLoaded; + ExtensionCreatedState extensionCreatedState; Oid distShardRelationId; Oid distPlacementRelationId; Oid distBackgroundJobRelationId; @@ -288,7 +301,6 @@ static void CreateDistTableCache(void); static void CreateShardIdCache(void); static void CreateDistObjectCache(void); static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId); -static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId); @@ -2187,16 +2199,30 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray, bool CitusHasBeenLoaded(void) { - if (!MetadataCache.extensionLoaded || creating_extension) + /* + * We do not use Citus hooks during CREATE/ALTER EXTENSION citus + * since the objects used by the C code might be not be there yet. + */ + if (creating_extension) { - /* - * Refresh if we have not determined whether the extension has been - * loaded yet, or in case of ALTER EXTENSION since we want to treat - * Citus as "not loaded" during ALTER EXTENSION citus. - */ - bool extensionLoaded = CitusHasBeenLoadedInternal(); + Oid citusExtensionOid = get_extension_oid("citus", true); - if (extensionLoaded && !MetadataCache.extensionLoaded) + if (CurrentExtensionObject == citusExtensionOid) + { + return false; + } + } + + /* + * If extensionCreatedState is UNKNOWN, query pg_extension for Citus + * and cache the result. Otherwise return the value extensionCreatedState + * indicates. + */ + if (MetadataCache.extensionCreatedState == UNKNOWN) + { + bool extensionCreated = CitusHasBeenLoadedInternal(); + + if (extensionCreated) { /* * Loaded Citus for the first time in this session, or first time after @@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void) */ StartupCitusBackend(); - /* - * InvalidateDistRelationCacheCallback resets state such as extensionLoaded - * when it notices changes to pg_dist_partition (which usually indicate - * `DROP EXTENSION citus;` has been run) - * - * Ensure InvalidateDistRelationCacheCallback will notice those changes - * by caching pg_dist_partition's oid. - * - * We skip these checks during upgrade since pg_dist_partition is not - * present during early stages of upgrade operation. - */ - DistPartitionRelationId(); - /* * This needs to be initialized so we can receive foreign relation graph * invalidation messages in InvalidateForeignRelationGraphCacheCallback(). * See the comments of InvalidateForeignKeyGraph for more context. */ DistColocationRelationId(); - } - MetadataCache.extensionLoaded = extensionLoaded; + MetadataCache.extensionCreatedState = CREATED; + } + else + { + MetadataCache.extensionCreatedState = NOTCREATED; + } } - return MetadataCache.extensionLoaded; + return (MetadataCache.extensionCreatedState == CREATED) ? true : false; } @@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void) return false; } - if (creating_extension && CurrentExtensionObject == citusExtensionOid) - { - /* - * We do not use Citus hooks during CREATE/ALTER EXTENSION citus - * since the objects used by the C code might be not be there yet. - */ - return false; - } - /* citus extension exists and has been created */ return true; } @@ -4201,10 +4209,6 @@ InitializeDistCache(void) CreateShardIdCache(); InitializeDistObjectCache(); - - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, - (Datum) 0); } @@ -4754,7 +4758,7 @@ InvalidateForeignKeyGraph(void) * InvalidateDistRelationCacheCallback flushes cache entries when a relation * is updated (or flushes the entire cache). */ -static void +void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { /* invalidate either entire cache or a specific entry */ @@ -4762,12 +4766,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { InvalidateDistTableCache(); InvalidateDistObjectCache(); + InvalidateMetadataSystemCache(); } else { void *hashKey = (void *) &relationId; bool foundInCache = false; + if (DistTableCacheHash == NULL) + { + return; + } + CitusTableCacheEntrySlot *cacheSlot = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache); if (foundInCache) @@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) } /* - * If pg_dist_partition is being invalidated drop all state - * This happens pretty rarely, but most importantly happens during - * DROP EXTENSION citus; This isn't the only time when this happens - * though, it can happen for multiple other reasons, such as an - * autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE - * wouldn't really need a full Metadata cache invalidation, but we - * don't know how to differentiate between DROP EXTENSION and ANALYZE. - * So for now we simply drop it in both cases and take the slight - * temporary performance hit. + * if pg_dist_partition relcache is invalidated for some reason, + * invalidate the MetadataCache. It is likely an overkill to invalidate + * the entire cache here. But until a better fix, we keep it this way + * for postgres regression tests that includes + * REINDEX SCHEMA CONCURRENTLY pg_catalog + * command. */ if (relationId == MetadataCache.distPartitionRelationId) { InvalidateMetadataSystemCache(); } + if (relationId == MetadataCache.distObjectRelationId) { InvalidateDistObjectCache(); @@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void) CitusTableCacheEntrySlot *cacheSlot = NULL; HASH_SEQ_STATUS status; + if (DistTableCacheHash == NULL) + { + return; + } + hash_seq_init(&status, DistTableCacheHash); while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) @@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void) DistObjectCacheEntry *cacheEntry = NULL; HASH_SEQ_STATUS status; + if (DistObjectCacheHash == NULL) + { + return; + } + hash_seq_init(&status, DistObjectCacheHash); while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL) @@ -4930,8 +4948,8 @@ CreateDistObjectCache(void) /* - * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag, - * and invalidates the worker node, ConnParams, and local group ID caches. + * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState + * flag and invalidates the worker node, ConnParams, and local group ID caches. */ void InvalidateMetadataSystemCache(void) diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 14287992e..a3f7092d1 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -612,7 +612,7 @@ GetPreLoadTableCreationCommands(Oid relationId, { List *tableDDLEventList = NIL; - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); /* fetch table schema and column option definitions */ char *tableSchemaDef = pg_get_tableschemadef_string(relationId, @@ -665,7 +665,7 @@ GetPreLoadTableCreationCommands(Oid relationId, tableDDLEventList = list_concat(tableDDLEventList, policyCommands); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return tableDDLEventList; } @@ -754,7 +754,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE int indexFlags) { /* generate fully-qualified names */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); Oid indexId = indexForm->indexrelid; bool indexImpliedByConstraint = IndexImpliedByAConstraint(indexForm); @@ -805,7 +805,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } diff --git a/src/backend/distributed/operations/replicate_none_dist_table_shard.c b/src/backend/distributed/operations/replicate_none_dist_table_shard.c index 945214aef..c28490367 100644 --- a/src/backend/distributed/operations/replicate_none_dist_table_shard.c +++ b/src/backend/distributed/operations/replicate_none_dist_table_shard.c @@ -158,7 +158,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) * local session because changes made to shards are allowed for Citus internal * backends anyway. */ - int save_nestlevel = NewGUCNestLevel(); + int saveNestLevel = NewGUCNestLevel(); SetLocalEnableLocalReferenceForeignKeys(false); SetLocalEnableManualChangesToShard(true); @@ -184,7 +184,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) bool localExecutionSupported = true; ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); - AtEOXact_GUC(true, save_nestlevel); + AtEOXact_GUC(true, saveNestLevel); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1f4cee037..e5d593295 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -109,6 +109,8 @@ #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -554,6 +556,9 @@ _PG_init(void) "ColumnarSupportsIndexAM", true, &handle); + CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, + (Datum) 0); + INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr); INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable); INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 5add48009..9a7bd9089 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -19,6 +19,8 @@ #include "access/twophase.h" #include "access/xact.h" +#include "catalog/dependency.h" +#include "common/hashfn.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" @@ -30,6 +32,7 @@ #include "distributed/local_executor.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/maintenanced.h" +#include "distributed/metadata/dependency.h" #include "distributed/multi_executor.h" #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" @@ -89,14 +92,25 @@ StringInfo activeSetStmts; * Though a list, we treat this as a stack, pushing on subxact contexts whenever * e.g. a SAVEPOINT is executed (though this is actually performed by providing * PostgreSQL with a sub-xact callback). At present, the context of a subxact - * includes a subxact identifier as well as any SET LOCAL statements propagated - * to workers during the sub-transaction. + * includes + * - a subxact identifier, + * - any SET LOCAL statements propagated to workers during the sub-transaction, + * - all objects propagated to workers during the sub-transaction. * * To be clear, last item of activeSubXactContexts list corresponds to top of * stack. */ static List *activeSubXactContexts = NIL; +/* + * PropagatedObjectsInTx is a set of objects propagated in the root transaction. + * We also keep track of objects propagated in sub-transactions in activeSubXactContexts. + * Any committed sub-transaction would cause the objects, which are propagated during + * the sub-transaction, to be moved to upper transaction's set. Objects are discarded + * when the sub-transaction is aborted. + */ +static HTAB *PropagatedObjectsInTx = NULL; + /* some pre-allocated memory so we don't need to call malloc() during callbacks */ MemoryContext CitusXactCallbackContext = NULL; @@ -142,11 +156,17 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); -static void PopSubXact(SubTransactionId subId); +static void PopSubXact(SubTransactionId subId, bool commit); static void ResetGlobalVariables(void); static bool SwallowErrors(void (*func)(void)); static void ForceAllInProgressConnectionsToClose(void); static void EnsurePrepareTransactionIsAllowed(void); +static HTAB * CurrentTransactionPropagatedObjects(bool readonly); +static HTAB * ParentTransactionPropagatedObjects(bool readonly); +static void MovePropagatedObjectsToParentTransaction(void); +static bool DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, + const ObjectAddress *dependency); +static HTAB * CreateTxPropagatedObjectsHash(void); /* @@ -321,6 +341,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + ResetPropagatedObjects(); /* * Make sure that we give the shared connections back to the shared @@ -391,6 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + ResetPropagatedObjects(); /* Reset any local replication origin session since transaction has been aborted.*/ ResetReplicationOriginLocalSession(); @@ -638,7 +660,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, switch (event) { /* - * Our subtransaction stack should be consistent with postgres' internal + * Our sub-transaction stack should be consistent with postgres' internal * transaction stack. In case of subxact begin, postgres calls our * callback after it has pushed the transaction into stack, so we have to * do the same even if worker commands fail, so we PushSubXact() first. @@ -672,7 +694,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRelease(subId); } - PopSubXact(subId); + PopSubXact(subId, true); /* Set CachedDuringCitusCreation to one level lower to represent citus creation is done */ @@ -706,7 +728,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRollback(subId); } - PopSubXact(subId); + PopSubXact(subId, false); /* * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus @@ -775,6 +797,9 @@ PushSubXact(SubTransactionId subId) state->subId = subId; state->setLocalCmds = activeSetStmts; + /* we lazily create hashset when any object is propagated during sub-transaction */ + state->propagatedObjects = NULL; + /* append to list and reset active set stmts for upcoming sub-xact */ activeSubXactContexts = lappend(activeSubXactContexts, state); activeSetStmts = makeStringInfo(); @@ -783,7 +808,7 @@ PushSubXact(SubTransactionId subId) /* PopSubXact pops subId from the stack of active sub-transactions. */ static void -PopSubXact(SubTransactionId subId) +PopSubXact(SubTransactionId subId, bool commit) { SubXactContext *state = llast(activeSubXactContexts); @@ -806,6 +831,16 @@ PopSubXact(SubTransactionId subId) */ activeSetStmts = state->setLocalCmds; + /* + * Keep subtransaction's propagated objects at parent transaction + * if subtransaction committed. Otherwise, discard them. + */ + if (commit) + { + MovePropagatedObjectsToParentTransaction(); + } + hash_destroy(state->propagatedObjects); + /* * Free state to avoid memory leaks when we create subxacts for each row, * e.g. in exception handling of UDFs. @@ -913,3 +948,227 @@ EnsurePrepareTransactionIsAllowed(void) errmsg("cannot use 2PC in transactions involving " "multiple servers"))); } + + +/* + * CurrentTransactionPropagatedObjects returns the objects propagated in current + * sub-transaction or the root transaction if no sub-transaction exists. + * + * If the propagated objects are readonly it will not create the hashmap if it does not + * already exist in the current sub-transaction. + */ +static HTAB * +CurrentTransactionPropagatedObjects(bool readonly) +{ + if (activeSubXactContexts == NIL) + { + /* hashset in the root transaction if there is no sub-transaction */ + if (PropagatedObjectsInTx == NULL && !readonly) + { + /* lazily create hashset for root transaction, for mutating uses */ + PropagatedObjectsInTx = CreateTxPropagatedObjectsHash(); + } + return PropagatedObjectsInTx; + } + + /* hashset in top level sub-transaction */ + SubXactContext *state = llast(activeSubXactContexts); + if (state->propagatedObjects == NULL && !readonly) + { + /* lazily create hashset for sub-transaction, for mutating uses */ + state->propagatedObjects = CreateTxPropagatedObjectsHash(); + } + return state->propagatedObjects; +} + + +/* + * ParentTransactionPropagatedObjects returns the objects propagated in parent + * transaction of active sub-transaction. It returns the root transaction if + * no sub-transaction exists. + * + * If the propagated objects are readonly it will not create the hashmap if it does not + * already exist in the target sub-transaction. + */ +static HTAB * +ParentTransactionPropagatedObjects(bool readonly) +{ + int nestingLevel = list_length(activeSubXactContexts); + if (nestingLevel <= 1) + { + /* + * The parent is the root transaction, when there is single level sub-transaction + * or no sub-transaction. + */ + if (PropagatedObjectsInTx == NULL && !readonly) + { + /* lazily create hashset for root transaction, for mutating uses */ + PropagatedObjectsInTx = CreateTxPropagatedObjectsHash(); + } + return PropagatedObjectsInTx; + } + + /* parent is upper sub-transaction */ + Assert(nestingLevel >= 2); + SubXactContext *state = list_nth(activeSubXactContexts, nestingLevel - 2); + if (state->propagatedObjects == NULL && !readonly) + { + /* lazily create hashset for parent sub-transaction */ + state->propagatedObjects = CreateTxPropagatedObjectsHash(); + } + return state->propagatedObjects; +} + + +/* + * MovePropagatedObjectsToParentTransaction moves all objects propagated in the current + * sub-transaction to the parent transaction. This should only be called when there is + * active sub-transaction. + */ +static void +MovePropagatedObjectsToParentTransaction(void) +{ + Assert(llast(activeSubXactContexts) != NULL); + HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(true); + if (currentPropagatedObjects == NULL) + { + /* nothing to move */ + return; + } + + /* + * Only after we know we have objects to move into the parent do we get a handle on + * a guaranteed existing parent hash table. This makes sure that the parents only + * get populated once there are objects to be tracked. + */ + HTAB *parentPropagatedObjects = ParentTransactionPropagatedObjects(false); + + HASH_SEQ_STATUS propagatedObjectsSeq; + hash_seq_init(&propagatedObjectsSeq, currentPropagatedObjects); + ObjectAddress *objectAddress = NULL; + while ((objectAddress = hash_seq_search(&propagatedObjectsSeq)) != NULL) + { + hash_search(parentPropagatedObjects, objectAddress, HASH_ENTER, NULL); + } +} + + +/* + * DependencyInPropagatedObjectsHash checks if dependency is in given hashset + * of propagated objects. + */ +static bool +DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, const + ObjectAddress *dependency) +{ + if (propagatedObjects == NULL) + { + return false; + } + + bool found = false; + hash_search(propagatedObjects, dependency, HASH_FIND, &found); + return found; +} + + +/* + * CreateTxPropagatedObjectsHash creates a hashset to keep track of the objects + * propagated in the current root transaction or sub-transaction. + */ +static HTAB * +CreateTxPropagatedObjectsHash(void) +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ObjectAddress); + info.entrysize = sizeof(ObjectAddress); + info.hash = tag_hash; + info.hcxt = CitusXactCallbackContext; + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + return hash_create("Tx Propagated Objects", 16, &info, hashFlags); +} + + +/* + * TrackPropagatedObject adds given object into the objects propagated in the current + * sub-transaction. + */ +void +TrackPropagatedObject(const ObjectAddress *objectAddress) +{ + HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(false); + hash_search(currentPropagatedObjects, objectAddress, HASH_ENTER, NULL); +} + + +/* + * TrackPropagatedTableAndSequences adds given table and its sequences to the objects + * propagated in the current sub-transaction. + */ +void +TrackPropagatedTableAndSequences(Oid relationId) +{ + /* track table */ + ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*tableAddress, RelationRelationId, relationId); + TrackPropagatedObject(tableAddress); + + /* track its sequences */ + List *ownedSeqIdList = getOwnedSequences(relationId); + Oid ownedSeqId = InvalidOid; + foreach_oid(ownedSeqId, ownedSeqIdList) + { + ObjectAddress *seqAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*seqAddress, RelationRelationId, ownedSeqId); + TrackPropagatedObject(seqAddress); + } +} + + +/* + * ResetPropagatedObjects destroys hashset of propagated objects in the root transaction. + */ +void +ResetPropagatedObjects(void) +{ + hash_destroy(PropagatedObjectsInTx); + PropagatedObjectsInTx = NULL; +} + + +/* + * HasAnyDependencyInPropagatedObjects decides if any dependency of given object is + * propagated in the current transaction. + */ +bool +HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) +{ + List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress); + ObjectAddress *dependency = NULL; + foreach_ptr(dependency, dependencyList) + { + /* first search in root transaction */ + if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency)) + { + return true; + } + + /* search in all nested sub-transactions */ + if (activeSubXactContexts == NIL) + { + continue; + } + SubXactContext *state = NULL; + foreach_ptr(state, activeSubXactContexts) + { + if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency)) + { + return true; + } + } + } + + return false; +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a9a855fb1..03ecbea72 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -135,6 +135,21 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command) } +/* + * SendCommandListToWorkersWithMetadata sends all commands to all metadata workers + * with the current user. See `SendCommandToWorkersWithMetadata`for details. + */ +void +SendCommandListToWorkersWithMetadata(List *commands) +{ + char *command = NULL; + foreach_ptr(command, commands) + { + SendCommandToWorkersWithMetadata(command); + } +} + + /* * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSet. diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 7042ebe7e..e7007874b 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -175,12 +175,11 @@ BreakColocation(Oid sourceRelationId) */ Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock); - uint32 newColocationId = GetNextColocationId(); - bool localOnly = false; - UpdateRelationColocationGroup(sourceRelationId, newColocationId, localOnly); + uint32 oldColocationId = TableColocationId(sourceRelationId); + CreateColocationGroupForRelation(sourceRelationId); - /* if there is not any remaining table in the colocation group, delete it */ - DeleteColocationGroupIfNoTablesBelong(sourceRelationId); + /* if there is not any remaining table in the old colocation group, delete it */ + DeleteColocationGroupIfNoTablesBelong(oldColocationId); table_close(pgDistColocation, NoLock); } diff --git a/src/backend/distributed/utils/namespace_utils.c b/src/backend/distributed/utils/namespace_utils.c index a97adb573..4f822b7d2 100644 --- a/src/backend/distributed/utils/namespace_utils.c +++ b/src/backend/distributed/utils/namespace_utils.c @@ -11,22 +11,33 @@ #include "postgres.h" -#include "catalog/namespace.h" -#include "distributed/citus_ruleutils.h" #include "distributed/namespace_utils.h" +#include "utils/guc.h" #include "utils/regproc.h" /* - * PushOverrideEmptySearchPath pushes search_path to be NIL and sets addCatalog to - * true so that all objects outside of pg_catalog will be schema-prefixed. - * Afterwards, PopOverrideSearchPath can be used to revert the search_path back. + * We use the equivalent of a function SET option to allow the setting to + * persist for the exact duration of the transaction, guc.c takes care of + * undoing the setting on error. + * + * We set search_path to "pg_catalog" instead of "" to expose useful utilities. + */ +int +PushEmptySearchPath() +{ + int saveNestLevel = NewGUCNestLevel(); + (void) set_config_option("search_path", "pg_catalog", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + return saveNestLevel; +} + + +/* + * Restore the GUC variable search_path we set in PushEmptySearchPath */ void -PushOverrideEmptySearchPath(MemoryContext memoryContext) +PopEmptySearchPath(int saveNestLevel) { - OverrideSearchPath *overridePath = GetOverrideSearchPath(memoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - - PushOverrideSearchPath(overridePath); + AtEOXact_GUC(true, saveNestLevel); } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4e918ecf7..34b95b859 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -137,6 +137,8 @@ typedef enum ANY_CITUS_TABLE_TYPE } CitusTableType; +void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); + extern List * AllCitusTableIds(void); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry); diff --git a/src/include/distributed/namespace_utils.h b/src/include/distributed/namespace_utils.h index 7d64ead12..6be101d2a 100644 --- a/src/include/distributed/namespace_utils.h +++ b/src/include/distributed/namespace_utils.h @@ -10,6 +10,7 @@ #ifndef NAMESPACE_UTILS_H #define NAMESPACE_UTILS_H -extern void PushOverrideEmptySearchPath(MemoryContext memoryContext); +extern int PushEmptySearchPath(void); +extern void PopEmptySearchPath(int saveNestLevel); #endif /* NAMESPACE_UTILS_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index e2d35048a..ca4e632a9 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -10,11 +10,13 @@ #define TRANSACTION_MANAGMENT_H #include "access/xact.h" +#include "catalog/objectaddress.h" #include "lib/ilist.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "lib/stringinfo.h" #include "nodes/primnodes.h" +#include "utils/hsearch.h" /* forward declare, to avoid recursive includes */ struct DistObjectCacheEntry; @@ -58,6 +60,7 @@ typedef struct SubXactContext { SubTransactionId subId; StringInfo setLocalCmds; + HTAB *propagatedObjects; } SubXactContext; /* @@ -157,6 +160,11 @@ extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); extern bool MaybeExecutingUDF(void); +/* functions for tracking the objects propagated in current transaction */ +extern void TrackPropagatedObject(const ObjectAddress *objectAddress); +extern void TrackPropagatedTableAndSequences(Oid relationId); +extern void ResetPropagatedObjects(void); +extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress); /* initialization function(s) */ extern void InitializeTransactionManagement(void); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index be8fe5ed6..631940edf 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -73,6 +73,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); +extern void SendCommandListToWorkersWithMetadata(List *commands); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 66ff044d2..907102482 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -429,6 +429,10 @@ PORT_UPPER_BOUND = 32768 next_port = PORT_LOWER_BOUND +def notice_handler(diag: psycopg.errors.Diagnostic): + print(f"{diag.severity}: {diag.message_primary}") + + def cleanup_test_leftovers(nodes): """ Cleaning up test leftovers needs to be done in a specific order, because @@ -444,7 +448,7 @@ def cleanup_test_leftovers(nodes): node.cleanup_publications() for node in nodes: - node.cleanup_logical_replication_slots() + node.cleanup_replication_slots() for node in nodes: node.cleanup_schemas() @@ -526,10 +530,12 @@ class QueryRunner(ABC): def conn(self, *, autocommit=True, **kwargs): """Open a psycopg connection to this server""" self.set_default_connection_options(kwargs) - return psycopg.connect( + conn = psycopg.connect( autocommit=autocommit, **kwargs, ) + conn.add_notice_handler(notice_handler) + return conn def aconn(self, *, autocommit=True, **kwargs): """Open an asynchronous psycopg connection to this server""" @@ -572,6 +578,21 @@ class QueryRunner(ABC): with self.cur(**kwargs) as cur: cur.execute(query, params=params) + def sql_row(self, query, params=None, allow_empty_result=False, **kwargs): + """Run an SQL query that returns a single row and returns this row + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params) + result = cur.fetchall() + + if allow_empty_result and len(result) == 0: + return None + + assert len(result) == 1, "sql_row returns more than one row" + return result[0] + def sql_value(self, query, params=None, allow_empty_result=False, **kwargs): """Run an SQL query that returns a single cell and return this value @@ -731,7 +752,7 @@ class Postgres(QueryRunner): # Used to track objects that we want to clean up at the end of a test self.subscriptions = set() self.publications = set() - self.logical_replication_slots = set() + self.replication_slots = set() self.schemas = set() self.users = set() @@ -983,7 +1004,7 @@ class Postgres(QueryRunner): def create_logical_replication_slot( self, name, plugin, temporary=False, twophase=False ): - self.logical_replication_slots.add(name) + self.replication_slots.add(name) self.sql( "SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)", (name, plugin, temporary, twophase), @@ -1015,12 +1036,21 @@ class Postgres(QueryRunner): ) ) - def cleanup_logical_replication_slots(self): - for slot in self.logical_replication_slots: - self.sql( - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", - (slot,), - ) + def cleanup_replication_slots(self): + for slot in self.replication_slots: + start = time.time() + while True: + try: + self.sql( + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", + (slot,), + ) + except psycopg.errors.ObjectInUse: + if time.time() < start + 10: + time.sleep(0.5) + continue + raise + break def cleanup_subscriptions(self): for subscription in self.subscriptions: diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 731b1a908..2b71f5e1b 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -168,6 +168,7 @@ DEPS = { ], ), "grant_on_schema_propagation": TestDeps("minimal_schedule"), + "propagate_extension_commands": TestDeps("minimal_schedule"), } diff --git a/src/test/regress/citus_tests/test/test_extension.py b/src/test/regress/citus_tests/test/test_extension.py new file mode 100644 index 000000000..e9b90f115 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_extension.py @@ -0,0 +1,44 @@ +import psycopg +import pytest + + +def test_create_drop_citus(coord): + with coord.cur() as cur1: + with coord.cur() as cur2: + # Conn1 drops the extension + # and Conn2 cannot use it. + cur1.execute("DROP EXTENSION citus") + + with pytest.raises(psycopg.errors.UndefinedFunction): + # Conn1 dropped the extension. citus_version udf + # cannot be found.sycopg.errors.UndefinedFunction + # is expected here. + cur2.execute("SELECT citus_version();") + + # Conn2 creates the extension, + # Conn1 is able to use it immediadtely. + cur2.execute("CREATE EXTENSION citus") + cur1.execute("SELECT citus_version();") + cur1.execute("DROP EXTENSION citus;") + + with coord.cur() as cur1: + with coord.cur() as cur2: + # A connection is able to create and use the extension + # within a transaction block. + cur1.execute("BEGIN;") + cur1.execute("CREATE TABLE t1(id int);") + cur1.execute("CREATE EXTENSION citus;") + cur1.execute("SELECT create_reference_table('t1')") + cur1.execute("ABORT;") + + # Conn1 aborted so Conn2 is be able to create and + # use the extension within a transaction block. + cur2.execute("BEGIN;") + cur2.execute("CREATE TABLE t1(id int);") + cur2.execute("CREATE EXTENSION citus;") + cur2.execute("SELECT create_reference_table('t1')") + cur2.execute("COMMIT;") + + # Conn2 commited so Conn1 is be able to use the + # extension immediately. + cur1.execute("SELECT citus_version();") diff --git a/src/test/regress/expected/distributed_domain.out b/src/test/regress/expected/distributed_domain.out index 5043d4f05..30e388803 100644 --- a/src/test/regress/expected/distributed_domain.out +++ b/src/test/regress/expected/distributed_domain.out @@ -947,3 +947,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist; NOTICE: type "domain_does_not_exist" does not exist, skipping SET client_min_messages TO warning; DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE; +DROP ROLE domain_owner; diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 8553a1d4d..90c78ca62 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -19,7 +19,7 @@ step s1-rebalance-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -40,8 +40,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -63,7 +63,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -102,7 +102,7 @@ step s1-rebalance-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -123,8 +123,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f |Completed -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f |Completed +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 2|move |t |t |f |Completed +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 2|move |t |t |f |Completed colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f |Setting Up colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (4 rows) @@ -141,7 +141,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -184,7 +184,7 @@ step s1-rebalance-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -205,8 +205,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -228,7 +228,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -271,7 +271,7 @@ step s1-rebalance-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -292,8 +292,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -315,7 +315,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -358,7 +358,7 @@ step s1-rebalance-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -379,8 +379,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |t |Final Catchup colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -402,7 +402,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -445,7 +445,7 @@ step s1-shard-move-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -466,8 +466,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data (2 rows) step s5-release-advisory-lock: @@ -487,7 +487,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -530,7 +530,7 @@ step s1-shard-move-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -551,8 +551,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f |Copying Data (2 rows) step s6-release-advisory-lock: @@ -572,7 +572,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -616,7 +616,7 @@ step s1-shard-copy-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -637,8 +637,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data (2 rows) step s5-release-advisory-lock: @@ -658,7 +658,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -702,7 +702,7 @@ step s1-shard-copy-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -723,8 +723,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|copy |t |t |f |Copying Data (2 rows) step s6-release-advisory-lock: @@ -744,7 +744,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -787,7 +787,7 @@ step s1-shard-move-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -808,8 +808,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up (2 rows) step s5-release-advisory-lock: @@ -829,7 +829,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -872,7 +872,7 @@ step s1-shard-move-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -893,8 +893,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |t |Final Catchup (2 rows) step s6-release-advisory-lock: @@ -914,7 +914,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -958,7 +958,7 @@ step s1-shard-copy-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -979,8 +979,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up (2 rows) step s5-release-advisory-lock: @@ -1000,7 +1000,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1044,7 +1044,7 @@ step s1-shard-copy-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1065,8 +1065,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t |Final Catchup -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t |Final Catchup +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|copy |t |t |t |Final Catchup +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|copy |t |t |t |Final Catchup (2 rows) step s6-release-advisory-lock: @@ -1086,7 +1086,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1132,7 +1132,7 @@ step s4-shard-move-sep-block-writes: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1153,9 +1153,9 @@ step s7-get-progress-ordered: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f +separate |1500009| 200000|localhost | 57637| 200000|localhost | 57638| 8000| 1|move |t |t |f (3 rows) step s5-release-advisory-lock: @@ -1182,7 +1182,7 @@ step s1-wait: step s4-wait: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1228,7 +1228,7 @@ step s4-shard-move-sep-block-writes: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1249,9 +1249,9 @@ step s7-get-progress-ordered: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f +separate |1500009| 200000|localhost | 57637| 200000|localhost | 57638| 200000| 1|move |t |t |f (3 rows) step s6-release-advisory-lock: @@ -1278,7 +1278,7 @@ step s1-wait: step s4-wait: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index dcb87486d..2de95266b 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1393,6 +1393,284 @@ BEGIN; ALTER SCHEMA bar RENAME TO foo; ROLLBACK; +-- below tests are to verify dependency propagation with nested sub-transactions +-- TEST1 +BEGIN; + CREATE SCHEMA sc1; + CREATE SEQUENCE sc1.seq; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to sequence sc1.seq +drop cascades to table sc1.s1 +-- TEST2 +CREATE SCHEMA sc1; +BEGIN; + CREATE SEQUENCE sc1.seq1; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to sequence sc1.seq1 +drop cascades to table sc1.s1 +-- TEST3 +SET citus.enable_metadata_sync TO off; +CREATE SCHEMA sc1; +SET citus.enable_metadata_sync TO on; +BEGIN; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST4 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST5 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST6 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST7 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST8 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + ROLLBACK TO SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST9 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + ROLLBACK TO SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST10 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +-- TEST11 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; +-- TEST12 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SEQUENCE seq1; + CREATE SCHEMA sc1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; +-- issue-6614 +CREATE FUNCTION create_schema_test() RETURNS void AS $$ +BEGIN + SET citus.create_object_propagation = 'deferred'; + CREATE SCHEMA test_1; + CREATE TABLE test_1.test ( + id bigserial constraint test_pk primary key, + creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null + ); + PERFORM create_reference_table('test_1.test'); + RETURN; +END; +$$ LANGUAGE plpgsql; +SELECT create_schema_test(); + create_schema_test +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +DROP FUNCTION create_schema_test; +DROP SCHEMA test_1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test_1.test +drop cascades to table test_1.test_1197064 -- Clean up the created schema SET client_min_messages TO WARNING; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object diff --git a/src/test/regress/expected/pg16.out b/src/test/regress/expected/pg16.out index 8c0fdc859..a8ff5e47d 100644 --- a/src/test/regress/expected/pg16.out +++ b/src/test/regress/expected/pg16.out @@ -314,6 +314,80 @@ SELECT result FROM run_command_on_workers (2 rows) SET search_path TO pg16; +SET citus.next_shard_id TO 951000; +-- Foreign table TRUNCATE trigger +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/3b00a94 +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET citus.use_citus_managed_tables TO ON; +CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial); +INSERT INTO foreign_table_test VALUES (1, 'text_test'); +CREATE EXTENSION postgres_fdw; +CREATE SERVER foreign_server + FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (host 'localhost', port :'master_port', dbname 'regression'); +CREATE USER MAPPING FOR CURRENT_USER + SERVER foreign_server + OPTIONS (user 'postgres'); +CREATE FOREIGN TABLE foreign_table ( + id integer NOT NULL, + data text, + a bigserial +) + SERVER foreign_server + OPTIONS (schema_name 'pg16', table_name 'foreign_table_test'); +-- verify it's a Citus foreign table +SELECT partmethod, repmodel FROM pg_dist_partition +WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid; + partmethod | repmodel +--------------------------------------------------------------------- + n | s +(1 row) + +INSERT INTO foreign_table VALUES (2, 'test_2'); +INSERT INTO foreign_table_test VALUES (3, 'test_3'); +CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; +CREATE FUNCTION trigger_func_on_shard() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func_on_shard(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; +CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); +SET citus.override_table_visibility TO off; +CREATE TRIGGER trig_stmt_shard_before BEFORE TRUNCATE ON foreign_table_951001 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func_on_shard(); +RESET citus.override_table_visibility; +SELECT * FROM foreign_table ORDER BY 1; + id | data | a +--------------------------------------------------------------------- + 1 | text_test | 1 + 2 | test_2 | 1 + 3 | test_3 | 2 +(3 rows) + +TRUNCATE foreign_table; +NOTICE: trigger_func() called: action = TRUNCATE, when = BEFORE, level = STATEMENT +CONTEXT: PL/pgSQL function trigger_func() line XX at RAISE +NOTICE: trigger_func_on_shard() called: action = TRUNCATE, when = BEFORE, level = STATEMENT +CONTEXT: PL/pgSQL function trigger_func_on_shard() line XX at RAISE +SELECT * FROM foreign_table ORDER BY 1; + id | data | a +--------------------------------------------------------------------- +(0 rows) + +RESET citus.use_citus_managed_tables; -- -- COPY FROM ... DEFAULT -- Already supported in Citus, adding all PG tests with a distributed table @@ -676,6 +750,62 @@ SELECT result FROM run_command_on_workers REINDEX (2 rows) +-- +-- random_normal() to provide normally-distributed random numbers +-- adding here the same tests as the ones with random() in aggregate_support.sql +-- Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176 +-- +CREATE TABLE dist_table (dist_col int, agg_col numeric); +SELECT create_distributed_table('dist_table', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_table (int_col int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Test the cases where the worker agg exec. returns no tuples. +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + percentile_disc +--------------------------------------------------------------------- + +(1 row) + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + percentile_disc +--------------------------------------------------------------------- + +(1 row) + +-- run the same queries after loading some data +INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), + (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19); +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + percentile_disc +--------------------------------------------------------------------- + 3.22 +(1 row) + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + percentile_disc +--------------------------------------------------------------------- + 1.19 +(1 row) + \set VERBOSITY terse SET client_min_messages TO ERROR; +DROP EXTENSION postgres_fdw CASCADE; DROP SCHEMA pg16 CASCADE; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 702d23f1f..c761efb3e 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -375,6 +375,158 @@ END; CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest; RESET citus.create_object_propagation; DROP SCHEMA deptest CASCADE; +-- +-- PG16 allows publications with schema and table of the same schema. +-- backpatched to PG15 +-- Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f +-- +CREATE SCHEMA publication2; +CREATE TABLE publication2.test1 (id int); +SELECT create_distributed_table('publication2.test1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- should be able to create publication with schema and table of the same +-- schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +CREATE TABLE publication.test2 (id int); +SELECT create_distributed_table('publication.test2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication.test2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication.test2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- should be able to have publication2 schema and its new table test2 in testpub_for_tbl_schema publication +ALTER TABLE test2 SET SCHEMA publication2; +-- should be able to add a table of the same schema to the schema publication +CREATE TABLE publication2.test3 (x int primary key, y int, "column-1" int); +SELECT create_distributed_table('publication2.test3', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication2.test2, TABLE publication2.test3 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- should be able to drop the table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication2.test2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +DROP PUBLICATION testpub_for_tbl_schema; +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +-- should be able to set publication with schema and table of the same schema +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE (id < 99); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE ((test1.id < 99)) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- test that using column list for table is disallowed if any schemas are +-- part of the publication +DROP PUBLICATION testpub_for_tbl_schema; +-- failure - cannot use column list and schema together +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +-- ok - only publish schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- failure - add a table with column list when there is already a schema in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +-- ok - only publish table with column list +ALTER PUBLICATION testpub_for_tbl_schema SET TABLE publication2.test3(y); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLE publication2.test3 (y) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- failure - specify a schema when there is already a column list in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2; +ERROR: cannot add schema to publication "testpub_for_tbl_schema" +DETAIL: Schemas cannot be added if any tables that specify a column list are already part of the publication. +-- failure - cannot SET column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +-- ok - drop table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- failure - cannot ADD column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2, TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. -- make sure we can sync all the publication metadata SELECT start_metadata_sync_to_all_nodes(); start_metadata_sync_to_all_nodes @@ -386,7 +538,9 @@ DROP PUBLICATION pubdep; DROP PUBLICATION "pub-mix"; DROP PUBLICATION pubtables; DROP PUBLICATION pubpartitioned; +DROP PUBLICATION testpub_for_tbl_schema; SET client_min_messages TO ERROR; DROP SCHEMA publication CASCADE; DROP SCHEMA "publication-1" CASCADE; DROP SCHEMA citus_schema_1 CASCADE; +DROP SCHEMA publication2 CASCADE; diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index e329e9483..234703c21 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -131,7 +131,7 @@ session "s7" step "s7-get-progress" { set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -157,7 +157,7 @@ step "s7-get-progress" step "s7-get-progress-ordered" { set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, diff --git a/src/test/regress/sql/distributed_domain.sql b/src/test/regress/sql/distributed_domain.sql index b03a2040f..5bf3bd6a8 100644 --- a/src/test/regress/sql/distributed_domain.sql +++ b/src/test/regress/sql/distributed_domain.sql @@ -487,3 +487,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist; SET client_min_messages TO warning; DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE; +DROP ROLE domain_owner; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index d870b624f..146cf78d4 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -995,6 +995,219 @@ BEGIN; ALTER SCHEMA bar RENAME TO foo; ROLLBACK; +-- below tests are to verify dependency propagation with nested sub-transactions +-- TEST1 +BEGIN; + CREATE SCHEMA sc1; + CREATE SEQUENCE sc1.seq; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST2 +CREATE SCHEMA sc1; +BEGIN; + CREATE SEQUENCE sc1.seq1; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST3 +SET citus.enable_metadata_sync TO off; +CREATE SCHEMA sc1; +SET citus.enable_metadata_sync TO on; +BEGIN; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST4 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST5 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST6 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST7 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST8 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + ROLLBACK TO SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST9 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + ROLLBACK TO SAVEPOINT sp2; + + SAVEPOINT sp3; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST10 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; + +-- TEST11 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; + +-- TEST12 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SEQUENCE seq1; + CREATE SCHEMA sc1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; + +-- issue-6614 +CREATE FUNCTION create_schema_test() RETURNS void AS $$ +BEGIN + SET citus.create_object_propagation = 'deferred'; + CREATE SCHEMA test_1; + CREATE TABLE test_1.test ( + id bigserial constraint test_pk primary key, + creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null + ); + PERFORM create_reference_table('test_1.test'); + RETURN; +END; +$$ LANGUAGE plpgsql; +SELECT create_schema_test(); +SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$); +DROP FUNCTION create_schema_test; +DROP SCHEMA test_1 CASCADE; + -- Clean up the created schema SET client_min_messages TO WARNING; diff --git a/src/test/regress/sql/pg16.sql b/src/test/regress/sql/pg16.sql index 1df96e6a7..8cffb917e 100644 --- a/src/test/regress/sql/pg16.sql +++ b/src/test/regress/sql/pg16.sql @@ -146,6 +146,63 @@ DROP DATABASE test_db; SELECT result FROM run_command_on_workers ($$DROP DATABASE test_db$$); SET search_path TO pg16; +SET citus.next_shard_id TO 951000; + +-- Foreign table TRUNCATE trigger +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/3b00a94 +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); +SET citus.use_citus_managed_tables TO ON; +CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial); +INSERT INTO foreign_table_test VALUES (1, 'text_test'); +CREATE EXTENSION postgres_fdw; +CREATE SERVER foreign_server + FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (host 'localhost', port :'master_port', dbname 'regression'); +CREATE USER MAPPING FOR CURRENT_USER + SERVER foreign_server + OPTIONS (user 'postgres'); +CREATE FOREIGN TABLE foreign_table ( + id integer NOT NULL, + data text, + a bigserial +) + SERVER foreign_server + OPTIONS (schema_name 'pg16', table_name 'foreign_table_test'); + +-- verify it's a Citus foreign table +SELECT partmethod, repmodel FROM pg_dist_partition +WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid; + +INSERT INTO foreign_table VALUES (2, 'test_2'); +INSERT INTO foreign_table_test VALUES (3, 'test_3'); + +CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; + +CREATE FUNCTION trigger_func_on_shard() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func_on_shard(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; + +CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); +SET citus.override_table_visibility TO off; +CREATE TRIGGER trig_stmt_shard_before BEFORE TRUNCATE ON foreign_table_951001 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func_on_shard(); +RESET citus.override_table_visibility; + +SELECT * FROM foreign_table ORDER BY 1; +TRUNCATE foreign_table; +SELECT * FROM foreign_table ORDER BY 1; + +RESET citus.use_citus_managed_tables; -- -- COPY FROM ... DEFAULT @@ -388,6 +445,42 @@ REINDEX SYSTEM; SELECT result FROM run_command_on_workers ($$REINDEX SYSTEM$$); +-- +-- random_normal() to provide normally-distributed random numbers +-- adding here the same tests as the ones with random() in aggregate_support.sql +-- Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176 +-- + +CREATE TABLE dist_table (dist_col int, agg_col numeric); +SELECT create_distributed_table('dist_table', 'dist_col'); + +CREATE TABLE ref_table (int_col int); +SELECT create_reference_table('ref_table'); + +-- Test the cases where the worker agg exec. returns no tuples. + +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + +-- run the same queries after loading some data + +INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), + (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19); + +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + \set VERBOSITY terse SET client_min_messages TO ERROR; +DROP EXTENSION postgres_fdw CASCADE; DROP SCHEMA pg16 CASCADE; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 8bd2ea923..06bdc39fe 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -273,6 +273,110 @@ CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest; RESET citus.create_object_propagation; DROP SCHEMA deptest CASCADE; +-- +-- PG16 allows publications with schema and table of the same schema. +-- backpatched to PG15 +-- Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f +-- + +CREATE SCHEMA publication2; +CREATE TABLE publication2.test1 (id int); +SELECT create_distributed_table('publication2.test1', 'id'); + +-- should be able to create publication with schema and table of the same +-- schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +CREATE TABLE publication.test2 (id int); +SELECT create_distributed_table('publication.test2', 'id'); +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication.test2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- should be able to have publication2 schema and its new table test2 in testpub_for_tbl_schema publication +ALTER TABLE test2 SET SCHEMA publication2; + +-- should be able to add a table of the same schema to the schema publication +CREATE TABLE publication2.test3 (x int primary key, y int, "column-1" int); +SELECT create_distributed_table('publication2.test3', 'x'); +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- should be able to drop the table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +DROP PUBLICATION testpub_for_tbl_schema; +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +-- should be able to set publication with schema and table of the same schema +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE (id < 99); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- test that using column list for table is disallowed if any schemas are +-- part of the publication +DROP PUBLICATION testpub_for_tbl_schema; + +-- failure - cannot use column list and schema together +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test3(y); + +-- ok - only publish schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- failure - add a table with column list when there is already a schema in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3(y); + +-- ok - only publish table with column list +ALTER PUBLICATION testpub_for_tbl_schema SET TABLE publication2.test3(y); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- failure - specify a schema when there is already a column list in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2; + +-- failure - cannot SET column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test3(y); + +-- ok - drop table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- failure - cannot ADD column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2, TABLE publication2.test3(y); + -- make sure we can sync all the publication metadata SELECT start_metadata_sync_to_all_nodes(); @@ -280,8 +384,10 @@ DROP PUBLICATION pubdep; DROP PUBLICATION "pub-mix"; DROP PUBLICATION pubtables; DROP PUBLICATION pubpartitioned; +DROP PUBLICATION testpub_for_tbl_schema; SET client_min_messages TO ERROR; DROP SCHEMA publication CASCADE; DROP SCHEMA "publication-1" CASCADE; DROP SCHEMA citus_schema_1 CASCADE; +DROP SCHEMA publication2 CASCADE;