Merge branch 'main' into alter_database_propagation

pull/7172/head
Gürkan İndibay 2023-09-07 11:27:59 +03:00 committed by GitHub
commit 260b8b089f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1640 additions and 255 deletions

View File

@ -6,7 +6,7 @@ orbs:
parameters:
image_suffix:
type: string
default: '-v0c8d80c'
default: '-v641cdcd'
pg14_version:
type: string
default: '14.9'
@ -15,10 +15,10 @@ parameters:
default: '15.4'
pg16_version:
type: string
default: '16beta3'
default: '16rc1'
upgrade_pg_versions:
type: string
default: '14.9-15.4-16beta3'
default: '14.9-15.4-16rc1'
style_checker_tools_version:
type: string
default: '0.8.18'

View File

@ -53,6 +53,7 @@
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/namespace_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/replication_origin_session_utils.h"
@ -1764,10 +1765,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed.
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
/*
* Push the transaction snapshot to be able to get vief definition with pg_get_viewdef
@ -1779,7 +1777,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
PopActiveSnapshot();
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
appendStringInfo(query, "AS %s", viewDefinition);

View File

@ -1478,11 +1478,20 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
static void
FinalizeCitusLocalTableCreation(Oid relationId)
{
#if PG_VERSION_NUM >= PG_VERSION_16
/*
* PG16+ supports truncate triggers on foreign tables
*/
if (RegularTable(relationId) || IsForeignTable(relationId))
#else
/*
* If it is a foreign table, then skip creating citus truncate trigger
* as foreign tables do not support truncate triggers.
*/
if (RegularTable(relationId))
#endif
{
CreateTruncateTrigger(relationId);
}

View File

@ -1256,8 +1256,17 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
colocationId, citusTableParams.replicationModel,
autoConverted);
#if PG_VERSION_NUM >= PG_VERSION_16
/*
* PG16+ supports truncate triggers on foreign tables
*/
if (RegularTable(relationId) || IsForeignTable(relationId))
#else
/* foreign tables do not support TRUNCATE trigger */
if (RegularTable(relationId))
#endif
{
CreateTruncateTrigger(relationId);
}
@ -1659,6 +1668,7 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
TrackPropagatedTableAndSequences(relationId);
}

View File

@ -112,6 +112,25 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
dependency->objectSubId, ExclusiveLock);
}
/*
* We need to propagate dependencies via the current user's metadata connection if
* any dependency for the target is created in the current transaction. Our assumption
* is that if we rely on a dependency created in the current transaction, then the
* current user, most probably, has permissions to create the target object as well.
* Note that, user still may not be able to create the target due to no permissions
* for any of its dependencies. But this is ok since it should be rare.
*
* If we opted to use a separate superuser connection for the target, then we would
* have visibility issues since propagated dependencies would be invisible to
* the separate connection until we locally commit.
*/
if (HasAnyDependencyInPropagatedObjects(target))
{
SendCommandListToWorkersWithMetadata(ddlCommands);
}
else
{
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
@ -122,6 +141,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
CitusExtensionOwnerName(),
ddlCommands);
}
}
/*
* We do this after creating the objects on the workers, we make sure

View File

@ -50,7 +50,7 @@ static List * GetAllViews(void);
static bool ShouldPropagateExtensionCommand(Node *parseTree);
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
static Node * RecreateExtensionStmt(Oid extensionOid);
static List * GenerateGrantCommandsOnExtesionDependentFDWs(Oid extensionId);
static List * GenerateGrantCommandsOnExtensionDependentFDWs(Oid extensionId);
/*
@ -985,7 +985,7 @@ CreateExtensionDDLCommand(const ObjectAddress *extensionAddress)
/* any privilege granted on FDWs that belong to the extension should be included */
List *FDWGrants =
GenerateGrantCommandsOnExtesionDependentFDWs(extensionAddress->objectId);
GenerateGrantCommandsOnExtensionDependentFDWs(extensionAddress->objectId);
ddlCommands = list_concat(ddlCommands, FDWGrants);
@ -1048,11 +1048,11 @@ RecreateExtensionStmt(Oid extensionOid)
/*
* GenerateGrantCommandsOnExtesionDependentFDWs returns a list of commands that GRANTs
* GenerateGrantCommandsOnExtensionDependentFDWs returns a list of commands that GRANTs
* the privileges on FDWs that are depending on the given extension.
*/
static List *
GenerateGrantCommandsOnExtesionDependentFDWs(Oid extensionId)
GenerateGrantCommandsOnExtensionDependentFDWs(Oid extensionId)
{
List *commands = NIL;
List *FDWOids = GetDependentFDWsToExtension(extensionId);

View File

@ -895,7 +895,7 @@ GetForeignConstraintCommandsInternal(Oid relationId, int flags)
List *foreignKeyCommands = NIL;
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
Oid foreignKeyOid = InvalidOid;
foreach_oid(foreignKeyOid, foreignKeyOids)
@ -906,7 +906,7 @@ GetForeignConstraintCommandsInternal(Oid relationId, int flags)
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return foreignKeyCommands;
}

View File

@ -909,15 +909,14 @@ GetFunctionDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace)
else
{
Datum sqlTextDatum = (Datum) 0;
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
sqlTextDatum = DirectFunctionCall1(pg_get_functiondef,
ObjectIdGetDatum(funcOid));
createFunctionSQL = TextDatumGetCString(sqlTextDatum);
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
}
return createFunctionSQL;

View File

@ -530,7 +530,7 @@ GetExplicitStatisticsCommandList(Oid relationId)
RelationClose(relation);
/* generate fully-qualified names */
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
Oid statisticsId = InvalidOid;
foreach_oid(statisticsId, statisticsIdList)
@ -579,7 +579,7 @@ GetExplicitStatisticsCommandList(Oid relationId)
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return explicitStatisticsCommandList;
}

View File

@ -74,7 +74,7 @@ GetExplicitTriggerCommandList(Oid relationId)
{
List *createTriggerCommandList = NIL;
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
List *triggerIdList = GetExplicitTriggerIdList(relationId);
@ -116,7 +116,7 @@ GetExplicitTriggerCommandList(Oid relationId)
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return createTriggerCommandList;
}

View File

@ -77,6 +77,7 @@
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
parsetree);
if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PreprocessCreateExtensionStmtForCitusColumnar(parsetree);
}
if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree))
{
/*
* Citus maintains a higher level cache. We use the cache invalidation mechanism
* of Postgres to achieve cache coherency between backends. Any change to citus
* extension should be made known to other backends. We do this by invalidating the
* relcache and therefore invoking the citus registered callback that invalidates
* the citus cache in other backends.
*/
CacheInvalidateRelcacheAll();
}
/*
* Make sure that on DROP DATABASE we terminate the background daemon
* associated with it.
@ -923,18 +937,10 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
foreach_ptr(address, addresses)
{
MarkObjectDistributed(address);
TrackPropagatedObject(address);
}
}
}
if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt))
{
/*
* Ensure value is valid, we can't do some checks during CREATE
* EXTENSION. This is important to register some invalidation callbacks.
*/
CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */
}
}

View File

@ -479,10 +479,7 @@ AppendViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid)
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed.
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
/*
* Push the transaction snapshot to be able to get vief definition with pg_get_viewdef
@ -494,7 +491,7 @@ AppendViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid)
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
PopActiveSnapshot();
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
appendStringInfo(buf, "AS %s ", viewDefinition);
}

View File

@ -818,7 +818,7 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid,
* Switch to empty search_path to deparse_index_columns to produce fully-
* qualified names in expressions.
*/
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
/* index column or expression list begins here */
appendStringInfoChar(buffer, '(');
@ -855,7 +855,7 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid,
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
}

View File

@ -345,9 +345,9 @@ AppendAlterDomainStmtSetDefault(StringInfo buf, AlterDomainStmt *stmt)
expr = TransformDefaultExpr(expr, stmt->typeName, baseTypeName);
/* deparse while the searchpath is cleared to force qualification of identifiers */
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
char *exprSql = deparse_expression(expr, NIL, true, true);
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
appendStringInfo(buf, "SET DEFAULT %s", exprSql);
}
@ -443,9 +443,9 @@ AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName,
elog(ERROR, "missing expression for domain constraint");
}
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
char *exprSql = deparse_expression(expr, NIL, true, true);
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
appendStringInfo(buf, " CHECK (%s)", exprSql);
return;
@ -469,9 +469,9 @@ AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName,
elog(ERROR, "missing expression for domain default");
}
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
char *exprSql = deparse_expression(expr, NIL, true, true);
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
appendStringInfo(buf, " DEFAULT %s", exprSql);
return;

View File

@ -307,11 +307,11 @@ AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName,
List *relationContext = deparse_context_for(tableName->relname, relation->rd_id);
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
char *whereClauseString = deparse_expression(whereClause,
relationContext,
true, true);
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
appendStringInfoString(buf, whereClauseString);

View File

@ -562,9 +562,9 @@ DeparseRawExprForColumnDefault(Oid relationId, Oid columnTypeId, int32 columnTyp
List *deparseContext = deparse_context_for(get_rel_name(relationId), relationId);
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
char *defaultExprStr = deparse_expression(defaultExpr, deparseContext, false, false);
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
RelationClose(relation);

View File

@ -53,6 +53,7 @@
#include "common/keywords.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/namespace_utils.h"
#include "executor/spi.h"
#include "foreign/foreign.h"
#include "funcapi.h"
@ -610,18 +611,14 @@ pg_get_rule_expr(Node *expression)
{
bool showImplicitCasts = true;
deparse_context context;
OverrideSearchPath *overridePath = NULL;
StringInfo buffer = makeStringInfo();
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
* PushEmptySearchPath().
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
context.buf = buffer;
context.namespaces = NIL;
@ -638,7 +635,7 @@ pg_get_rule_expr(Node *expression)
get_rule_expr(expression, &context, showImplicitCasts);
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return buffer->data;
}
@ -1955,8 +1952,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
deparse_context context;
deparse_namespace dpns;
OverrideSearchPath *overridePath = NULL;
/* Guard against excessively long or deeply-nested queries */
CHECK_FOR_INTERRUPTS();
check_stack_depth();
@ -1975,12 +1970,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
* PushEmptySearchPath().
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
context.buf = buf;
context.namespaces = lcons(&dpns, list_copy(parentnamespace));
@ -2031,7 +2023,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
}
/* ----------

View File

@ -54,6 +54,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/namespace_utils.h"
#include "executor/spi.h"
#include "foreign/foreign.h"
#include "funcapi.h"
@ -624,18 +625,14 @@ pg_get_rule_expr(Node *expression)
{
bool showImplicitCasts = true;
deparse_context context;
OverrideSearchPath *overridePath = NULL;
StringInfo buffer = makeStringInfo();
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
* PushEmptySearchPath(), since we set addCatalog to true;
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
context.buf = buffer;
context.namespaces = NIL;
@ -652,7 +649,7 @@ pg_get_rule_expr(Node *expression)
get_rule_expr(expression, &context, showImplicitCasts);
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return buffer->data;
}
@ -2038,8 +2035,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
deparse_context context;
deparse_namespace dpns;
OverrideSearchPath *overridePath = NULL;
/* Guard against excessively long or deeply-nested queries */
CHECK_FOR_INTERRUPTS();
check_stack_depth();
@ -2058,12 +2053,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
* PushEmptySearchPath().
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
context.buf = buf;
context.namespaces = lcons(&dpns, list_copy(parentnamespace));
@ -2118,7 +2110,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
}
/* ----------

View File

@ -54,6 +54,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/namespace_utils.h"
#include "executor/spi.h"
#include "foreign/foreign.h"
#include "funcapi.h"
@ -641,18 +642,14 @@ pg_get_rule_expr(Node *expression)
{
bool showImplicitCasts = true;
deparse_context context;
OverrideSearchPath *overridePath = NULL;
StringInfo buffer = makeStringInfo();
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
* PushEmptySearchPath().
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
context.buf = buffer;
context.namespaces = NIL;
@ -669,7 +666,7 @@ pg_get_rule_expr(Node *expression)
get_rule_expr(expression, &context, showImplicitCasts);
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return buffer->data;
}
@ -2052,8 +2049,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
deparse_context context;
deparse_namespace dpns;
OverrideSearchPath *overridePath = NULL;
/* Guard against excessively long or deeply-nested queries */
CHECK_FOR_INTERRUPTS();
check_stack_depth();
@ -2072,12 +2067,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
* PushEmptySearchPath().
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
int saveNestLevel = PushEmptySearchPath();
context.buf = buf;
context.namespaces = lcons(&dpns, list_copy(parentnamespace));
@ -2132,7 +2124,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
}
/* ----------

View File

@ -133,6 +133,19 @@ typedef struct ShardIdCacheEntry
int shardIndex;
} ShardIdCacheEntry;
/*
* ExtensionCreatedState is used to track if citus extension has been created
* using CREATE EXTENSION command.
* UNKNOWN : MetadataCache is invalid. State is UNKNOWN.
* CREATED : Citus is created.
* NOTCREATED : Citus is not created.
*/
typedef enum ExtensionCreatedState
{
UNKNOWN = 0,
CREATED = 1,
NOTCREATED = 2,
} ExtensionCreatedState;
/*
* State which should be cleared upon DROP EXTENSION. When the configuration
@ -140,7 +153,7 @@ typedef struct ShardIdCacheEntry
*/
typedef struct MetadataCacheData
{
bool extensionLoaded;
ExtensionCreatedState extensionCreatedState;
Oid distShardRelationId;
Oid distPlacementRelationId;
Oid distBackgroundJobRelationId;
@ -288,7 +301,6 @@ static void CreateDistTableCache(void);
static void CreateShardIdCache(void);
static void CreateDistObjectCache(void);
static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId);
@ -2186,17 +2198,31 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
*/
bool
CitusHasBeenLoaded(void)
{
if (!MetadataCache.extensionLoaded || creating_extension)
{
/*
* Refresh if we have not determined whether the extension has been
* loaded yet, or in case of ALTER EXTENSION since we want to treat
* Citus as "not loaded" during ALTER EXTENSION citus.
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
* since the objects used by the C code might be not be there yet.
*/
bool extensionLoaded = CitusHasBeenLoadedInternal();
if (creating_extension)
{
Oid citusExtensionOid = get_extension_oid("citus", true);
if (extensionLoaded && !MetadataCache.extensionLoaded)
if (CurrentExtensionObject == citusExtensionOid)
{
return false;
}
}
/*
* If extensionCreatedState is UNKNOWN, query pg_extension for Citus
* and cache the result. Otherwise return the value extensionCreatedState
* indicates.
*/
if (MetadataCache.extensionCreatedState == UNKNOWN)
{
bool extensionCreated = CitusHasBeenLoadedInternal();
if (extensionCreated)
{
/*
* Loaded Citus for the first time in this session, or first time after
@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void)
*/
StartupCitusBackend();
/*
* InvalidateDistRelationCacheCallback resets state such as extensionLoaded
* when it notices changes to pg_dist_partition (which usually indicate
* `DROP EXTENSION citus;` has been run)
*
* Ensure InvalidateDistRelationCacheCallback will notice those changes
* by caching pg_dist_partition's oid.
*
* We skip these checks during upgrade since pg_dist_partition is not
* present during early stages of upgrade operation.
*/
DistPartitionRelationId();
/*
* This needs to be initialized so we can receive foreign relation graph
* invalidation messages in InvalidateForeignRelationGraphCacheCallback().
* See the comments of InvalidateForeignKeyGraph for more context.
*/
DistColocationRelationId();
MetadataCache.extensionCreatedState = CREATED;
}
else
{
MetadataCache.extensionCreatedState = NOTCREATED;
}
}
MetadataCache.extensionLoaded = extensionLoaded;
}
return MetadataCache.extensionLoaded;
return (MetadataCache.extensionCreatedState == CREATED) ? true : false;
}
@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void)
return false;
}
if (creating_extension && CurrentExtensionObject == citusExtensionOid)
{
/*
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
* since the objects used by the C code might be not be there yet.
*/
return false;
}
/* citus extension exists and has been created */
return true;
}
@ -4201,10 +4209,6 @@ InitializeDistCache(void)
CreateShardIdCache();
InitializeDistObjectCache();
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);
}
@ -4754,7 +4758,7 @@ InvalidateForeignKeyGraph(void)
* InvalidateDistRelationCacheCallback flushes cache entries when a relation
* is updated (or flushes the entire cache).
*/
static void
void
InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
{
/* invalidate either entire cache or a specific entry */
@ -4762,12 +4766,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
{
InvalidateDistTableCache();
InvalidateDistObjectCache();
InvalidateMetadataSystemCache();
}
else
{
void *hashKey = (void *) &relationId;
bool foundInCache = false;
if (DistTableCacheHash == NULL)
{
return;
}
CitusTableCacheEntrySlot *cacheSlot =
hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
if (foundInCache)
@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
}
/*
* If pg_dist_partition is being invalidated drop all state
* This happens pretty rarely, but most importantly happens during
* DROP EXTENSION citus; This isn't the only time when this happens
* though, it can happen for multiple other reasons, such as an
* autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE
* wouldn't really need a full Metadata cache invalidation, but we
* don't know how to differentiate between DROP EXTENSION and ANALYZE.
* So for now we simply drop it in both cases and take the slight
* temporary performance hit.
* if pg_dist_partition relcache is invalidated for some reason,
* invalidate the MetadataCache. It is likely an overkill to invalidate
* the entire cache here. But until a better fix, we keep it this way
* for postgres regression tests that includes
* REINDEX SCHEMA CONCURRENTLY pg_catalog
* command.
*/
if (relationId == MetadataCache.distPartitionRelationId)
{
InvalidateMetadataSystemCache();
}
if (relationId == MetadataCache.distObjectRelationId)
{
InvalidateDistObjectCache();
@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void)
CitusTableCacheEntrySlot *cacheSlot = NULL;
HASH_SEQ_STATUS status;
if (DistTableCacheHash == NULL)
{
return;
}
hash_seq_init(&status, DistTableCacheHash);
while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void)
DistObjectCacheEntry *cacheEntry = NULL;
HASH_SEQ_STATUS status;
if (DistObjectCacheHash == NULL)
{
return;
}
hash_seq_init(&status, DistObjectCacheHash);
while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL)
@ -4930,8 +4948,8 @@ CreateDistObjectCache(void)
/*
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
* and invalidates the worker node, ConnParams, and local group ID caches.
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState
* flag and invalidates the worker node, ConnParams, and local group ID caches.
*/
void
InvalidateMetadataSystemCache(void)

View File

@ -612,7 +612,7 @@ GetPreLoadTableCreationCommands(Oid relationId,
{
List *tableDDLEventList = NIL;
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
/* fetch table schema and column option definitions */
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
@ -665,7 +665,7 @@ GetPreLoadTableCreationCommands(Oid relationId,
tableDDLEventList = list_concat(tableDDLEventList, policyCommands);
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
return tableDDLEventList;
}
@ -754,7 +754,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE
int indexFlags)
{
/* generate fully-qualified names */
PushOverrideEmptySearchPath(CurrentMemoryContext);
int saveNestLevel = PushEmptySearchPath();
Oid indexId = indexForm->indexrelid;
bool indexImpliedByConstraint = IndexImpliedByAConstraint(indexForm);
@ -805,7 +805,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE
}
/* revert back to original search_path */
PopOverrideSearchPath();
PopEmptySearchPath(saveNestLevel);
}

View File

@ -158,7 +158,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
* local session because changes made to shards are allowed for Citus internal
* backends anyway.
*/
int save_nestlevel = NewGUCNestLevel();
int saveNestLevel = NewGUCNestLevel();
SetLocalEnableLocalReferenceForeignKeys(false);
SetLocalEnableManualChangesToShard(true);
@ -184,7 +184,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
AtEOXact_GUC(true, save_nestlevel);
AtEOXact_GUC(true, saveNestLevel);
}

View File

@ -109,6 +109,8 @@
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@ -554,6 +556,9 @@ _PG_init(void)
"ColumnarSupportsIndexAM",
true, &handle);
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);
INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr);
INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable);
INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions);

View File

@ -19,6 +19,8 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "common/hashfn.h"
#include "distributed/backend_data.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
@ -30,6 +32,7 @@
#include "distributed/local_executor.h"
#include "distributed/locally_reserved_shared_connections.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata/dependency.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_explain.h"
@ -89,14 +92,25 @@ StringInfo activeSetStmts;
* Though a list, we treat this as a stack, pushing on subxact contexts whenever
* e.g. a SAVEPOINT is executed (though this is actually performed by providing
* PostgreSQL with a sub-xact callback). At present, the context of a subxact
* includes a subxact identifier as well as any SET LOCAL statements propagated
* to workers during the sub-transaction.
* includes
* - a subxact identifier,
* - any SET LOCAL statements propagated to workers during the sub-transaction,
* - all objects propagated to workers during the sub-transaction.
*
* To be clear, last item of activeSubXactContexts list corresponds to top of
* stack.
*/
static List *activeSubXactContexts = NIL;
/*
* PropagatedObjectsInTx is a set of objects propagated in the root transaction.
* We also keep track of objects propagated in sub-transactions in activeSubXactContexts.
* Any committed sub-transaction would cause the objects, which are propagated during
* the sub-transaction, to be moved to upper transaction's set. Objects are discarded
* when the sub-transaction is aborted.
*/
static HTAB *PropagatedObjectsInTx = NULL;
/* some pre-allocated memory so we don't need to call malloc() during callbacks */
MemoryContext CitusXactCallbackContext = NULL;
@ -142,11 +156,17 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
/* remaining functions */
static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId, bool commit);
static void ResetGlobalVariables(void);
static bool SwallowErrors(void (*func)(void));
static void ForceAllInProgressConnectionsToClose(void);
static void EnsurePrepareTransactionIsAllowed(void);
static HTAB * CurrentTransactionPropagatedObjects(bool readonly);
static HTAB * ParentTransactionPropagatedObjects(bool readonly);
static void MovePropagatedObjectsToParentTransaction(void);
static bool DependencyInPropagatedObjectsHash(HTAB *propagatedObjects,
const ObjectAddress *dependency);
static HTAB * CreateTxPropagatedObjectsHash(void);
/*
@ -321,6 +341,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
ResetRelationAccessHash();
ResetPropagatedObjects();
/*
* Make sure that we give the shared connections back to the shared
@ -391,6 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
ResetRelationAccessHash();
ResetPropagatedObjects();
/* Reset any local replication origin session since transaction has been aborted.*/
ResetReplicationOriginLocalSession();
@ -638,7 +660,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
switch (event)
{
/*
* Our subtransaction stack should be consistent with postgres' internal
* Our sub-transaction stack should be consistent with postgres' internal
* transaction stack. In case of subxact begin, postgres calls our
* callback after it has pushed the transaction into stack, so we have to
* do the same even if worker commands fail, so we PushSubXact() first.
@ -672,7 +694,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
{
CoordinatedRemoteTransactionsSavepointRelease(subId);
}
PopSubXact(subId);
PopSubXact(subId, true);
/* Set CachedDuringCitusCreation to one level lower to represent citus creation is done */
@ -706,7 +728,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
{
CoordinatedRemoteTransactionsSavepointRollback(subId);
}
PopSubXact(subId);
PopSubXact(subId, false);
/*
* Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus
@ -775,6 +797,9 @@ PushSubXact(SubTransactionId subId)
state->subId = subId;
state->setLocalCmds = activeSetStmts;
/* we lazily create hashset when any object is propagated during sub-transaction */
state->propagatedObjects = NULL;
/* append to list and reset active set stmts for upcoming sub-xact */
activeSubXactContexts = lappend(activeSubXactContexts, state);
activeSetStmts = makeStringInfo();
@ -783,7 +808,7 @@ PushSubXact(SubTransactionId subId)
/* PopSubXact pops subId from the stack of active sub-transactions. */
static void
PopSubXact(SubTransactionId subId)
PopSubXact(SubTransactionId subId, bool commit)
{
SubXactContext *state = llast(activeSubXactContexts);
@ -806,6 +831,16 @@ PopSubXact(SubTransactionId subId)
*/
activeSetStmts = state->setLocalCmds;
/*
* Keep subtransaction's propagated objects at parent transaction
* if subtransaction committed. Otherwise, discard them.
*/
if (commit)
{
MovePropagatedObjectsToParentTransaction();
}
hash_destroy(state->propagatedObjects);
/*
* Free state to avoid memory leaks when we create subxacts for each row,
* e.g. in exception handling of UDFs.
@ -913,3 +948,227 @@ EnsurePrepareTransactionIsAllowed(void)
errmsg("cannot use 2PC in transactions involving "
"multiple servers")));
}
/*
* CurrentTransactionPropagatedObjects returns the objects propagated in current
* sub-transaction or the root transaction if no sub-transaction exists.
*
* If the propagated objects are readonly it will not create the hashmap if it does not
* already exist in the current sub-transaction.
*/
static HTAB *
CurrentTransactionPropagatedObjects(bool readonly)
{
if (activeSubXactContexts == NIL)
{
/* hashset in the root transaction if there is no sub-transaction */
if (PropagatedObjectsInTx == NULL && !readonly)
{
/* lazily create hashset for root transaction, for mutating uses */
PropagatedObjectsInTx = CreateTxPropagatedObjectsHash();
}
return PropagatedObjectsInTx;
}
/* hashset in top level sub-transaction */
SubXactContext *state = llast(activeSubXactContexts);
if (state->propagatedObjects == NULL && !readonly)
{
/* lazily create hashset for sub-transaction, for mutating uses */
state->propagatedObjects = CreateTxPropagatedObjectsHash();
}
return state->propagatedObjects;
}
/*
* ParentTransactionPropagatedObjects returns the objects propagated in parent
* transaction of active sub-transaction. It returns the root transaction if
* no sub-transaction exists.
*
* If the propagated objects are readonly it will not create the hashmap if it does not
* already exist in the target sub-transaction.
*/
static HTAB *
ParentTransactionPropagatedObjects(bool readonly)
{
int nestingLevel = list_length(activeSubXactContexts);
if (nestingLevel <= 1)
{
/*
* The parent is the root transaction, when there is single level sub-transaction
* or no sub-transaction.
*/
if (PropagatedObjectsInTx == NULL && !readonly)
{
/* lazily create hashset for root transaction, for mutating uses */
PropagatedObjectsInTx = CreateTxPropagatedObjectsHash();
}
return PropagatedObjectsInTx;
}
/* parent is upper sub-transaction */
Assert(nestingLevel >= 2);
SubXactContext *state = list_nth(activeSubXactContexts, nestingLevel - 2);
if (state->propagatedObjects == NULL && !readonly)
{
/* lazily create hashset for parent sub-transaction */
state->propagatedObjects = CreateTxPropagatedObjectsHash();
}
return state->propagatedObjects;
}
/*
* MovePropagatedObjectsToParentTransaction moves all objects propagated in the current
* sub-transaction to the parent transaction. This should only be called when there is
* active sub-transaction.
*/
static void
MovePropagatedObjectsToParentTransaction(void)
{
Assert(llast(activeSubXactContexts) != NULL);
HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(true);
if (currentPropagatedObjects == NULL)
{
/* nothing to move */
return;
}
/*
* Only after we know we have objects to move into the parent do we get a handle on
* a guaranteed existing parent hash table. This makes sure that the parents only
* get populated once there are objects to be tracked.
*/
HTAB *parentPropagatedObjects = ParentTransactionPropagatedObjects(false);
HASH_SEQ_STATUS propagatedObjectsSeq;
hash_seq_init(&propagatedObjectsSeq, currentPropagatedObjects);
ObjectAddress *objectAddress = NULL;
while ((objectAddress = hash_seq_search(&propagatedObjectsSeq)) != NULL)
{
hash_search(parentPropagatedObjects, objectAddress, HASH_ENTER, NULL);
}
}
/*
* DependencyInPropagatedObjectsHash checks if dependency is in given hashset
* of propagated objects.
*/
static bool
DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, const
ObjectAddress *dependency)
{
if (propagatedObjects == NULL)
{
return false;
}
bool found = false;
hash_search(propagatedObjects, dependency, HASH_FIND, &found);
return found;
}
/*
* CreateTxPropagatedObjectsHash creates a hashset to keep track of the objects
* propagated in the current root transaction or sub-transaction.
*/
static HTAB *
CreateTxPropagatedObjectsHash(void)
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ObjectAddress);
info.entrysize = sizeof(ObjectAddress);
info.hash = tag_hash;
info.hcxt = CitusXactCallbackContext;
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
return hash_create("Tx Propagated Objects", 16, &info, hashFlags);
}
/*
* TrackPropagatedObject adds given object into the objects propagated in the current
* sub-transaction.
*/
void
TrackPropagatedObject(const ObjectAddress *objectAddress)
{
HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(false);
hash_search(currentPropagatedObjects, objectAddress, HASH_ENTER, NULL);
}
/*
* TrackPropagatedTableAndSequences adds given table and its sequences to the objects
* propagated in the current sub-transaction.
*/
void
TrackPropagatedTableAndSequences(Oid relationId)
{
/* track table */
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
TrackPropagatedObject(tableAddress);
/* track its sequences */
List *ownedSeqIdList = getOwnedSequences(relationId);
Oid ownedSeqId = InvalidOid;
foreach_oid(ownedSeqId, ownedSeqIdList)
{
ObjectAddress *seqAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*seqAddress, RelationRelationId, ownedSeqId);
TrackPropagatedObject(seqAddress);
}
}
/*
* ResetPropagatedObjects destroys hashset of propagated objects in the root transaction.
*/
void
ResetPropagatedObjects(void)
{
hash_destroy(PropagatedObjectsInTx);
PropagatedObjectsInTx = NULL;
}
/*
* HasAnyDependencyInPropagatedObjects decides if any dependency of given object is
* propagated in the current transaction.
*/
bool
HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
{
List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencyList)
{
/* first search in root transaction */
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency))
{
return true;
}
/* search in all nested sub-transactions */
if (activeSubXactContexts == NIL)
{
continue;
}
SubXactContext *state = NULL;
foreach_ptr(state, activeSubXactContexts)
{
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency))
{
return true;
}
}
}
return false;
}

View File

@ -135,6 +135,21 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
}
/*
* SendCommandListToWorkersWithMetadata sends all commands to all metadata workers
* with the current user. See `SendCommandToWorkersWithMetadata`for details.
*/
void
SendCommandListToWorkersWithMetadata(List *commands)
{
char *command = NULL;
foreach_ptr(command, commands)
{
SendCommandToWorkersWithMetadata(command);
}
}
/*
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
* TargetWorkerSet.

View File

@ -175,12 +175,11 @@ BreakColocation(Oid sourceRelationId)
*/
Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
uint32 newColocationId = GetNextColocationId();
bool localOnly = false;
UpdateRelationColocationGroup(sourceRelationId, newColocationId, localOnly);
uint32 oldColocationId = TableColocationId(sourceRelationId);
CreateColocationGroupForRelation(sourceRelationId);
/* if there is not any remaining table in the colocation group, delete it */
DeleteColocationGroupIfNoTablesBelong(sourceRelationId);
/* if there is not any remaining table in the old colocation group, delete it */
DeleteColocationGroupIfNoTablesBelong(oldColocationId);
table_close(pgDistColocation, NoLock);
}

View File

@ -11,22 +11,33 @@
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/namespace_utils.h"
#include "utils/guc.h"
#include "utils/regproc.h"
/*
* PushOverrideEmptySearchPath pushes search_path to be NIL and sets addCatalog to
* true so that all objects outside of pg_catalog will be schema-prefixed.
* Afterwards, PopOverrideSearchPath can be used to revert the search_path back.
* We use the equivalent of a function SET option to allow the setting to
* persist for the exact duration of the transaction, guc.c takes care of
* undoing the setting on error.
*
* We set search_path to "pg_catalog" instead of "" to expose useful utilities.
*/
int
PushEmptySearchPath()
{
int saveNestLevel = NewGUCNestLevel();
(void) set_config_option("search_path", "pg_catalog",
PGC_USERSET, PGC_S_SESSION,
GUC_ACTION_SAVE, true, 0, false);
return saveNestLevel;
}
/*
* Restore the GUC variable search_path we set in PushEmptySearchPath
*/
void
PushOverrideEmptySearchPath(MemoryContext memoryContext)
PopEmptySearchPath(int saveNestLevel)
{
OverrideSearchPath *overridePath = GetOverrideSearchPath(memoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
AtEOXact_GUC(true, saveNestLevel);
}

View File

@ -137,6 +137,8 @@ typedef enum
ANY_CITUS_TABLE_TYPE
} CitusTableType;
void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
extern List * AllCitusTableIds(void);
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);

View File

@ -10,6 +10,7 @@
#ifndef NAMESPACE_UTILS_H
#define NAMESPACE_UTILS_H
extern void PushOverrideEmptySearchPath(MemoryContext memoryContext);
extern int PushEmptySearchPath(void);
extern void PopEmptySearchPath(int saveNestLevel);
#endif /* NAMESPACE_UTILS_H */

View File

@ -10,11 +10,13 @@
#define TRANSACTION_MANAGMENT_H
#include "access/xact.h"
#include "catalog/objectaddress.h"
#include "lib/ilist.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "lib/stringinfo.h"
#include "nodes/primnodes.h"
#include "utils/hsearch.h"
/* forward declare, to avoid recursive includes */
struct DistObjectCacheEntry;
@ -58,6 +60,7 @@ typedef struct SubXactContext
{
SubTransactionId subId;
StringInfo setLocalCmds;
HTAB *propagatedObjects;
} SubXactContext;
/*
@ -157,6 +160,11 @@ extern bool IsMultiStatementTransaction(void);
extern void EnsureDistributedTransactionId(void);
extern bool MaybeExecutingUDF(void);
/* functions for tracking the objects propagated in current transaction */
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
extern void TrackPropagatedTableAndSequences(Oid relationId);
extern void ResetPropagatedObjects(void);
extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);

View File

@ -73,6 +73,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
commandList);
extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToWorkersWithMetadata(List *commands);
extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,

View File

@ -429,6 +429,10 @@ PORT_UPPER_BOUND = 32768
next_port = PORT_LOWER_BOUND
def notice_handler(diag: psycopg.errors.Diagnostic):
print(f"{diag.severity}: {diag.message_primary}")
def cleanup_test_leftovers(nodes):
"""
Cleaning up test leftovers needs to be done in a specific order, because
@ -444,7 +448,7 @@ def cleanup_test_leftovers(nodes):
node.cleanup_publications()
for node in nodes:
node.cleanup_logical_replication_slots()
node.cleanup_replication_slots()
for node in nodes:
node.cleanup_schemas()
@ -526,10 +530,12 @@ class QueryRunner(ABC):
def conn(self, *, autocommit=True, **kwargs):
"""Open a psycopg connection to this server"""
self.set_default_connection_options(kwargs)
return psycopg.connect(
conn = psycopg.connect(
autocommit=autocommit,
**kwargs,
)
conn.add_notice_handler(notice_handler)
return conn
def aconn(self, *, autocommit=True, **kwargs):
"""Open an asynchronous psycopg connection to this server"""
@ -572,6 +578,21 @@ class QueryRunner(ABC):
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
def sql_row(self, query, params=None, allow_empty_result=False, **kwargs):
"""Run an SQL query that returns a single row and returns this row
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
result = cur.fetchall()
if allow_empty_result and len(result) == 0:
return None
assert len(result) == 1, "sql_row returns more than one row"
return result[0]
def sql_value(self, query, params=None, allow_empty_result=False, **kwargs):
"""Run an SQL query that returns a single cell and return this value
@ -731,7 +752,7 @@ class Postgres(QueryRunner):
# Used to track objects that we want to clean up at the end of a test
self.subscriptions = set()
self.publications = set()
self.logical_replication_slots = set()
self.replication_slots = set()
self.schemas = set()
self.users = set()
@ -983,7 +1004,7 @@ class Postgres(QueryRunner):
def create_logical_replication_slot(
self, name, plugin, temporary=False, twophase=False
):
self.logical_replication_slots.add(name)
self.replication_slots.add(name)
self.sql(
"SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)",
(name, plugin, temporary, twophase),
@ -1015,12 +1036,21 @@ class Postgres(QueryRunner):
)
)
def cleanup_logical_replication_slots(self):
for slot in self.logical_replication_slots:
def cleanup_replication_slots(self):
for slot in self.replication_slots:
start = time.time()
while True:
try:
self.sql(
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s",
(slot,),
)
except psycopg.errors.ObjectInUse:
if time.time() < start + 10:
time.sleep(0.5)
continue
raise
break
def cleanup_subscriptions(self):
for subscription in self.subscriptions:

View File

@ -168,6 +168,7 @@ DEPS = {
],
),
"grant_on_schema_propagation": TestDeps("minimal_schedule"),
"propagate_extension_commands": TestDeps("minimal_schedule"),
}

View File

@ -0,0 +1,44 @@
import psycopg
import pytest
def test_create_drop_citus(coord):
with coord.cur() as cur1:
with coord.cur() as cur2:
# Conn1 drops the extension
# and Conn2 cannot use it.
cur1.execute("DROP EXTENSION citus")
with pytest.raises(psycopg.errors.UndefinedFunction):
# Conn1 dropped the extension. citus_version udf
# cannot be found.sycopg.errors.UndefinedFunction
# is expected here.
cur2.execute("SELECT citus_version();")
# Conn2 creates the extension,
# Conn1 is able to use it immediadtely.
cur2.execute("CREATE EXTENSION citus")
cur1.execute("SELECT citus_version();")
cur1.execute("DROP EXTENSION citus;")
with coord.cur() as cur1:
with coord.cur() as cur2:
# A connection is able to create and use the extension
# within a transaction block.
cur1.execute("BEGIN;")
cur1.execute("CREATE TABLE t1(id int);")
cur1.execute("CREATE EXTENSION citus;")
cur1.execute("SELECT create_reference_table('t1')")
cur1.execute("ABORT;")
# Conn1 aborted so Conn2 is be able to create and
# use the extension within a transaction block.
cur2.execute("BEGIN;")
cur2.execute("CREATE TABLE t1(id int);")
cur2.execute("CREATE EXTENSION citus;")
cur2.execute("SELECT create_reference_table('t1')")
cur2.execute("COMMIT;")
# Conn2 commited so Conn1 is be able to use the
# extension immediately.
cur1.execute("SELECT citus_version();")

View File

@ -947,3 +947,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist;
NOTICE: type "domain_does_not_exist" does not exist, skipping
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE;
DROP ROLE domain_owner;

View File

@ -19,7 +19,7 @@ step s1-rebalance-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -40,8 +40,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
(4 rows)
@ -63,7 +63,7 @@ rebalance_table_shards
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -102,7 +102,7 @@ step s1-rebalance-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -123,8 +123,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f |Completed
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f |Completed
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 2|move |t |t |f |Completed
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 2|move |t |t |f |Completed
colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f |Setting Up
colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f |Setting Up
(4 rows)
@ -141,7 +141,7 @@ rebalance_table_shards
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -184,7 +184,7 @@ step s1-rebalance-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -205,8 +205,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f |Copying Data
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f |Copying Data
colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
(4 rows)
@ -228,7 +228,7 @@ rebalance_table_shards
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -271,7 +271,7 @@ step s1-rebalance-c1-online:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -292,8 +292,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
(4 rows)
@ -315,7 +315,7 @@ rebalance_table_shards
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -358,7 +358,7 @@ step s1-rebalance-c1-online:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -379,8 +379,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |t |Final Catchup
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |t |Final Catchup
colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet
(4 rows)
@ -402,7 +402,7 @@ rebalance_table_shards
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -445,7 +445,7 @@ step s1-shard-move-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -466,8 +466,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data
(2 rows)
step s5-release-advisory-lock:
@ -487,7 +487,7 @@ citus_move_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -530,7 +530,7 @@ step s1-shard-move-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -551,8 +551,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f |Copying Data
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f |Copying Data
(2 rows)
step s6-release-advisory-lock:
@ -572,7 +572,7 @@ citus_move_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -616,7 +616,7 @@ step s1-shard-copy-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -637,8 +637,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data
(2 rows)
step s5-release-advisory-lock:
@ -658,7 +658,7 @@ citus_copy_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -702,7 +702,7 @@ step s1-shard-copy-c1-block-writes:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -723,8 +723,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f |Copying Data
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f |Copying Data
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|copy |t |t |f |Copying Data
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|copy |t |t |f |Copying Data
(2 rows)
step s6-release-advisory-lock:
@ -744,7 +744,7 @@ citus_copy_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -787,7 +787,7 @@ step s1-shard-move-c1-online:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -808,8 +808,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up
(2 rows)
step s5-release-advisory-lock:
@ -829,7 +829,7 @@ citus_move_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -872,7 +872,7 @@ step s1-shard-move-c1-online:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -893,8 +893,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |t |Final Catchup
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |t |Final Catchup
(2 rows)
step s6-release-advisory-lock:
@ -914,7 +914,7 @@ citus_move_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -958,7 +958,7 @@ step s1-shard-copy-c1-online:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -979,8 +979,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up
(2 rows)
step s5-release-advisory-lock:
@ -1000,7 +1000,7 @@ citus_copy_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -1044,7 +1044,7 @@ step s1-shard-copy-c1-online:
<waiting ...>
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -1065,8 +1065,8 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t |Final Catchup
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t |Final Catchup
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|copy |t |t |t |Final Catchup
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|copy |t |t |t |Final Catchup
(2 rows)
step s6-release-advisory-lock:
@ -1086,7 +1086,7 @@ citus_copy_shard_placement
step s1-wait:
step s7-get-progress:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -1132,7 +1132,7 @@ step s4-shard-move-sep-block-writes:
<waiting ...>
step s7-get-progress-ordered:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -1153,9 +1153,9 @@ step s7-get-progress-ordered:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f
separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f
separate |1500009| 200000|localhost | 57637| 200000|localhost | 57638| 8000| 1|move |t |t |f
(3 rows)
step s5-release-advisory-lock:
@ -1182,7 +1182,7 @@ step s1-wait:
step s4-wait:
step s7-get-progress-ordered:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -1228,7 +1228,7 @@ step s4-shard-move-sep-block-writes:
<waiting ...>
step s7-get-progress-ordered:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -1249,9 +1249,9 @@ step s7-get-progress-ordered:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available
---------------------------------------------------------------------
colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f
colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f
separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f
colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f
colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f
separate |1500009| 200000|localhost | 57637| 200000|localhost | 57638| 200000| 1|move |t |t |f
(3 rows)
step s6-release-advisory-lock:
@ -1278,7 +1278,7 @@ step s1-wait:
step s4-wait:
step s7-get-progress-ordered:
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,

View File

@ -1393,6 +1393,284 @@ BEGIN;
ALTER SCHEMA bar RENAME TO foo;
ROLLBACK;
-- below tests are to verify dependency propagation with nested sub-transactions
-- TEST1
BEGIN;
CREATE SCHEMA sc1;
CREATE SEQUENCE sc1.seq;
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to sequence sc1.seq
drop cascades to table sc1.s1
-- TEST2
CREATE SCHEMA sc1;
BEGIN;
CREATE SEQUENCE sc1.seq1;
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to sequence sc1.seq1
drop cascades to table sc1.s1
-- TEST3
SET citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET citus.enable_metadata_sync TO on;
BEGIN;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
-- TEST4
BEGIN;
SAVEPOINT sp1;
CREATE SCHEMA sc1;
ROLLBACK TO SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
-- TEST5
BEGIN;
SAVEPOINT sp1;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
DROP SEQUENCE seq1;
-- TEST6
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc1;
ROLLBACK TO SAVEPOINT sp2;
RELEASE SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
-- TEST7
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp2;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
DROP SEQUENCE seq1;
-- TEST8
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp2;
ROLLBACK TO SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
-- TEST9
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
ROLLBACK TO SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
DROP SEQUENCE seq1;
-- TEST10
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
RELEASE SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc3;
SAVEPOINT sp4;
CREATE SCHEMA sc1;
ROLLBACK TO SAVEPOINT sp4;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
-- TEST11
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
RELEASE SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc3;
SAVEPOINT sp4;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp4;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SEQUENCE seq1;
-- TEST12
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
RELEASE SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc3;
SAVEPOINT sp4;
CREATE SEQUENCE seq1;
CREATE SCHEMA sc1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
RELEASE SAVEPOINT sp4;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
COMMIT;
DROP SCHEMA sc1 CASCADE;
NOTICE: drop cascades to table sc1.s1
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SEQUENCE seq1;
-- issue-6614
CREATE FUNCTION create_schema_test() RETURNS void AS $$
BEGIN
SET citus.create_object_propagation = 'deferred';
CREATE SCHEMA test_1;
CREATE TABLE test_1.test (
id bigserial constraint test_pk primary key,
creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null
);
PERFORM create_reference_table('test_1.test');
RETURN;
END;
$$ LANGUAGE plpgsql;
SELECT create_schema_test();
create_schema_test
---------------------------------------------------------------------
(1 row)
SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
DROP FUNCTION create_schema_test;
DROP SCHEMA test_1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table test_1.test
drop cascades to table test_1.test_1197064
-- Clean up the created schema
SET client_min_messages TO WARNING;
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object

View File

@ -314,6 +314,80 @@ SELECT result FROM run_command_on_workers
(2 rows)
SET search_path TO pg16;
SET citus.next_shard_id TO 951000;
-- Foreign table TRUNCATE trigger
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/3b00a94
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
SET citus.use_citus_managed_tables TO ON;
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
INSERT INTO foreign_table_test VALUES (1, 'text_test');
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server
OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table (
id integer NOT NULL,
data text,
a bigserial
)
SERVER foreign_server
OPTIONS (schema_name 'pg16', table_name 'foreign_table_test');
-- verify it's a Citus foreign table
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
n | s
(1 row)
INSERT INTO foreign_table VALUES (2, 'test_2');
INSERT INTO foreign_table_test VALUES (3, 'test_3');
CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %',
TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL;
RETURN NULL;
END;$$;
CREATE FUNCTION trigger_func_on_shard() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE 'trigger_func_on_shard(%) called: action = %, when = %, level = %',
TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL;
RETURN NULL;
END;$$;
CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table
FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func();
SET citus.override_table_visibility TO off;
CREATE TRIGGER trig_stmt_shard_before BEFORE TRUNCATE ON foreign_table_951001
FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func_on_shard();
RESET citus.override_table_visibility;
SELECT * FROM foreign_table ORDER BY 1;
id | data | a
---------------------------------------------------------------------
1 | text_test | 1
2 | test_2 | 1
3 | test_3 | 2
(3 rows)
TRUNCATE foreign_table;
NOTICE: trigger_func(<NULL>) called: action = TRUNCATE, when = BEFORE, level = STATEMENT
CONTEXT: PL/pgSQL function trigger_func() line XX at RAISE
NOTICE: trigger_func_on_shard(<NULL>) called: action = TRUNCATE, when = BEFORE, level = STATEMENT
CONTEXT: PL/pgSQL function trigger_func_on_shard() line XX at RAISE
SELECT * FROM foreign_table ORDER BY 1;
id | data | a
---------------------------------------------------------------------
(0 rows)
RESET citus.use_citus_managed_tables;
--
-- COPY FROM ... DEFAULT
-- Already supported in Citus, adding all PG tests with a distributed table
@ -676,6 +750,62 @@ SELECT result FROM run_command_on_workers
REINDEX
(2 rows)
--
-- random_normal() to provide normally-distributed random numbers
-- adding here the same tests as the ones with random() in aggregate_support.sql
-- Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176
--
CREATE TABLE dist_table (dist_col int, agg_col numeric);
SELECT create_distributed_table('dist_table', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_table (int_col int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- Test the cases where the worker agg exec. returns no tuples.
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random_normal() FROM dist_table) a;
percentile_disc
---------------------------------------------------------------------
(1 row)
SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10)
WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
percentile_disc
---------------------------------------------------------------------
(1 row)
-- run the same queries after loading some data
INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25),
(4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19);
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random_normal() FROM dist_table) a;
percentile_disc
---------------------------------------------------------------------
3.22
(1 row)
SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10)
WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
percentile_disc
---------------------------------------------------------------------
1.19
(1 row)
\set VERBOSITY terse
SET client_min_messages TO ERROR;
DROP EXTENSION postgres_fdw CASCADE;
DROP SCHEMA pg16 CASCADE;

View File

@ -375,6 +375,158 @@ END;
CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest;
RESET citus.create_object_propagation;
DROP SCHEMA deptest CASCADE;
--
-- PG16 allows publications with schema and table of the same schema.
-- backpatched to PG15
-- Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f
--
CREATE SCHEMA publication2;
CREATE TABLE publication2.test1 (id int);
SELECT create_distributed_table('publication2.test1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- should be able to create publication with schema and table of the same
-- schema
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
CREATE TABLE publication.test2 (id int);
SELECT create_distributed_table('publication.test2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication.test2;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication.test2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- should be able to have publication2 schema and its new table test2 in testpub_for_tbl_schema publication
ALTER TABLE test2 SET SCHEMA publication2;
-- should be able to add a table of the same schema to the schema publication
CREATE TABLE publication2.test3 (x int primary key, y int, "column-1" int);
SELECT create_distributed_table('publication2.test3', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication2.test2, TABLE publication2.test3 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- should be able to drop the table
ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication2.test2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
DROP PUBLICATION testpub_for_tbl_schema;
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2;
-- should be able to set publication with schema and table of the same schema
ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE (id < 99);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE ((test1.id < 99)) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- test that using column list for table is disallowed if any schemas are
-- part of the publication
DROP PUBLICATION testpub_for_tbl_schema;
-- failure - cannot use column list and schema together
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test3(y);
ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema"
DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements.
-- ok - only publish schema
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- failure - add a table with column list when there is already a schema in the
-- publication
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3(y);
ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema"
DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements.
-- ok - only publish table with column list
ALTER PUBLICATION testpub_for_tbl_schema SET TABLE publication2.test3(y);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLE publication2.test3 (y) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- failure - specify a schema when there is already a column list in the
-- publication
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2;
ERROR: cannot add schema to publication "testpub_for_tbl_schema"
DETAIL: Schemas cannot be added if any tables that specify a column list are already part of the publication.
-- failure - cannot SET column list and schema together
ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test3(y);
ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema"
DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements.
-- ok - drop table
ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
c
---------------------------------------------------------------------
SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')');
(1 row)
-- failure - cannot ADD column list and schema together
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2, TABLE publication2.test3(y);
ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema"
DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements.
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
@ -386,7 +538,9 @@ DROP PUBLICATION pubdep;
DROP PUBLICATION "pub-mix";
DROP PUBLICATION pubtables;
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION testpub_for_tbl_schema;
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
DROP SCHEMA citus_schema_1 CASCADE;
DROP SCHEMA publication2 CASCADE;

View File

@ -131,7 +131,7 @@ session "s7"
step "s7-get-progress"
{
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,
@ -157,7 +157,7 @@ step "s7-get-progress"
step "s7-get-progress-ordered"
{
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000))
SELECT
table_name,
shardid,

View File

@ -487,3 +487,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist;
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE;
DROP ROLE domain_owner;

View File

@ -995,6 +995,219 @@ BEGIN;
ALTER SCHEMA bar RENAME TO foo;
ROLLBACK;
-- below tests are to verify dependency propagation with nested sub-transactions
-- TEST1
BEGIN;
CREATE SCHEMA sc1;
CREATE SEQUENCE sc1.seq;
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq')));
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
-- TEST2
CREATE SCHEMA sc1;
BEGIN;
CREATE SEQUENCE sc1.seq1;
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1')));
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
-- TEST3
SET citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET citus.enable_metadata_sync TO on;
BEGIN;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
-- TEST4
BEGIN;
SAVEPOINT sp1;
CREATE SCHEMA sc1;
ROLLBACK TO SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
-- TEST5
BEGIN;
SAVEPOINT sp1;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
DROP SEQUENCE seq1;
-- TEST6
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc1;
ROLLBACK TO SAVEPOINT sp2;
RELEASE SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
-- TEST7
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp2;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
DROP SEQUENCE seq1;
-- TEST8
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp2;
ROLLBACK TO SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
-- TEST9
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
ROLLBACK TO SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
DROP SEQUENCE seq1;
-- TEST10
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
RELEASE SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc3;
SAVEPOINT sp4;
CREATE SCHEMA sc1;
ROLLBACK TO SAVEPOINT sp4;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
SET LOCAL citus.enable_metadata_sync TO off;
CREATE SCHEMA sc1;
SET LOCAL citus.enable_metadata_sync TO on;
CREATE TABLE sc1.s1(id int);
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
-- TEST11
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
RELEASE SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc3;
SAVEPOINT sp4;
CREATE SCHEMA sc1;
RELEASE SAVEPOINT sp4;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
CREATE SEQUENCE seq1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
COMMIT;
DROP SCHEMA sc1 CASCADE;
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SEQUENCE seq1;
-- TEST12
BEGIN;
SAVEPOINT sp1;
SAVEPOINT sp2;
CREATE SCHEMA sc2;
RELEASE SAVEPOINT sp2;
SAVEPOINT sp3;
CREATE SCHEMA sc3;
SAVEPOINT sp4;
CREATE SEQUENCE seq1;
CREATE SCHEMA sc1;
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
SELECT create_distributed_table('sc1.s1','id');
RELEASE SAVEPOINT sp4;
RELEASE SAVEPOINT sp3;
RELEASE SAVEPOINT sp1;
COMMIT;
DROP SCHEMA sc1 CASCADE;
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SEQUENCE seq1;
-- issue-6614
CREATE FUNCTION create_schema_test() RETURNS void AS $$
BEGIN
SET citus.create_object_propagation = 'deferred';
CREATE SCHEMA test_1;
CREATE TABLE test_1.test (
id bigserial constraint test_pk primary key,
creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null
);
PERFORM create_reference_table('test_1.test');
RETURN;
END;
$$ LANGUAGE plpgsql;
SELECT create_schema_test();
SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$);
DROP FUNCTION create_schema_test;
DROP SCHEMA test_1 CASCADE;
-- Clean up the created schema
SET client_min_messages TO WARNING;

View File

@ -146,6 +146,63 @@ DROP DATABASE test_db;
SELECT result FROM run_command_on_workers
($$DROP DATABASE test_db$$);
SET search_path TO pg16;
SET citus.next_shard_id TO 951000;
-- Foreign table TRUNCATE trigger
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/3b00a94
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SET citus.use_citus_managed_tables TO ON;
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
INSERT INTO foreign_table_test VALUES (1, 'text_test');
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server
OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table (
id integer NOT NULL,
data text,
a bigserial
)
SERVER foreign_server
OPTIONS (schema_name 'pg16', table_name 'foreign_table_test');
-- verify it's a Citus foreign table
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
INSERT INTO foreign_table VALUES (2, 'test_2');
INSERT INTO foreign_table_test VALUES (3, 'test_3');
CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %',
TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL;
RETURN NULL;
END;$$;
CREATE FUNCTION trigger_func_on_shard() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE 'trigger_func_on_shard(%) called: action = %, when = %, level = %',
TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL;
RETURN NULL;
END;$$;
CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table
FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func();
SET citus.override_table_visibility TO off;
CREATE TRIGGER trig_stmt_shard_before BEFORE TRUNCATE ON foreign_table_951001
FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func_on_shard();
RESET citus.override_table_visibility;
SELECT * FROM foreign_table ORDER BY 1;
TRUNCATE foreign_table;
SELECT * FROM foreign_table ORDER BY 1;
RESET citus.use_citus_managed_tables;
--
-- COPY FROM ... DEFAULT
@ -388,6 +445,42 @@ REINDEX SYSTEM;
SELECT result FROM run_command_on_workers
($$REINDEX SYSTEM$$);
--
-- random_normal() to provide normally-distributed random numbers
-- adding here the same tests as the ones with random() in aggregate_support.sql
-- Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176
--
CREATE TABLE dist_table (dist_col int, agg_col numeric);
SELECT create_distributed_table('dist_table', 'dist_col');
CREATE TABLE ref_table (int_col int);
SELECT create_reference_table('ref_table');
-- Test the cases where the worker agg exec. returns no tuples.
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random_normal() FROM dist_table) a;
SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10)
WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
-- run the same queries after loading some data
INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25),
(4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19);
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random_normal() FROM dist_table) a;
SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10)
WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
\set VERBOSITY terse
SET client_min_messages TO ERROR;
DROP EXTENSION postgres_fdw CASCADE;
DROP SCHEMA pg16 CASCADE;

View File

@ -273,6 +273,110 @@ CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest;
RESET citus.create_object_propagation;
DROP SCHEMA deptest CASCADE;
--
-- PG16 allows publications with schema and table of the same schema.
-- backpatched to PG15
-- Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f
--
CREATE SCHEMA publication2;
CREATE TABLE publication2.test1 (id int);
SELECT create_distributed_table('publication2.test1', 'id');
-- should be able to create publication with schema and table of the same
-- schema
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
CREATE TABLE publication.test2 (id int);
SELECT create_distributed_table('publication.test2', 'id');
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication.test2;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
-- should be able to have publication2 schema and its new table test2 in testpub_for_tbl_schema publication
ALTER TABLE test2 SET SCHEMA publication2;
-- should be able to add a table of the same schema to the schema publication
CREATE TABLE publication2.test3 (x int primary key, y int, "column-1" int);
SELECT create_distributed_table('publication2.test3', 'x');
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
-- should be able to drop the table
ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
DROP PUBLICATION testpub_for_tbl_schema;
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2;
-- should be able to set publication with schema and table of the same schema
ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE (id < 99);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
-- test that using column list for table is disallowed if any schemas are
-- part of the publication
DROP PUBLICATION testpub_for_tbl_schema;
-- failure - cannot use column list and schema together
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test3(y);
-- ok - only publish schema
CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
-- failure - add a table with column list when there is already a schema in the
-- publication
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3(y);
-- ok - only publish table with column list
ALTER PUBLICATION testpub_for_tbl_schema SET TABLE publication2.test3(y);
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
-- failure - specify a schema when there is already a column list in the
-- publication
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2;
-- failure - cannot SET column list and schema together
ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test3(y);
-- ok - drop table
ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3;
SELECT DISTINCT c FROM (
SELECT unnest(result::text[]) c
FROM run_command_on_workers($$
SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$)
ORDER BY c) s;
-- failure - cannot ADD column list and schema together
ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2, TABLE publication2.test3(y);
-- make sure we can sync all the publication metadata
SELECT start_metadata_sync_to_all_nodes();
@ -280,8 +384,10 @@ DROP PUBLICATION pubdep;
DROP PUBLICATION "pub-mix";
DROP PUBLICATION pubtables;
DROP PUBLICATION pubpartitioned;
DROP PUBLICATION testpub_for_tbl_schema;
SET client_min_messages TO ERROR;
DROP SCHEMA publication CASCADE;
DROP SCHEMA "publication-1" CASCADE;
DROP SCHEMA citus_schema_1 CASCADE;
DROP SCHEMA publication2 CASCADE;