Merge branch 'master' into velioglu/function_propagation

velioglu/tmpfuncprop
Burak Velioglu 2022-02-16 00:09:06 +03:00
commit bd2072397a
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
20 changed files with 208 additions and 297 deletions

View File

@ -32,6 +32,8 @@ why we ask this as well as instructions for how to proceed, see the
./configure ./configure
make make
make install make install
# Optionally, you might instead want to use `make install-all`
# since `multi_extension` regression test would fail due to missing downgrade scripts.
cd src/test/regress cd src/test/regress
make check make check
``` ```
@ -51,7 +53,7 @@ why we ask this as well as instructions for how to proceed, see the
autoconf flex git libcurl4-gnutls-dev libicu-dev \ autoconf flex git libcurl4-gnutls-dev libicu-dev \
libkrb5-dev liblz4-dev libpam0g-dev libreadline-dev \ libkrb5-dev liblz4-dev libpam0g-dev libreadline-dev \
libselinux1-dev libssl-dev libxslt1-dev libzstd-dev \ libselinux1-dev libssl-dev libxslt1-dev libzstd-dev \
make uuid-dev make uuid-dev mitmproxy
``` ```
2. Get, build, and test the code 2. Get, build, and test the code
@ -62,6 +64,8 @@ why we ask this as well as instructions for how to proceed, see the
./configure ./configure
make make
sudo make install sudo make install
# Optionally, you might instead want to use `sudo make install-all`
# since `multi_extension` regression test would fail due to missing downgrade scripts.
cd src/test/regress cd src/test/regress
make check make check
``` ```
@ -104,6 +108,8 @@ why we ask this as well as instructions for how to proceed, see the
PG_CONFIG=/usr/pgsql-14/bin/pg_config ./configure PG_CONFIG=/usr/pgsql-14/bin/pg_config ./configure
make make
sudo make install sudo make install
# Optionally, you might instead want to use `sudo make install-all`
# since `multi_extension` regression test would fail due to missing downgrade scripts.
cd src/test/regress cd src/test/regress
make check make check
``` ```
@ -125,6 +131,8 @@ cd build
cmake .. cmake ..
make -j5 make -j5
sudo make install sudo make install
# Optionally, you might instead want to use `sudo make install-all`
# since `multi_extension` regression test would fail due to missing downgrade scripts.
cd ../.. cd ../..
git clone https://github.com/citusdata/tools.git git clone https://github.com/citusdata/tools.git

View File

@ -37,7 +37,6 @@ static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner,
char **quotedCollationName); char **quotedCollationName);
static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok, static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok,
List **addresses); List **addresses);
static void EnsureSequentialModeForCollationDDL(void);
/* /*
@ -256,7 +255,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString,
char *dropStmtSql = DeparseTreeNode((Node *) stmt); char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = oldCollations; stmt->objects = oldCollations;
EnsureSequentialModeForCollationDDL(); EnsureSequentialMode(OBJECT_COLLATION);
/* to prevent recursion with mx we disable ddl propagation */ /* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
@ -292,7 +291,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString,
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
char *sql = DeparseTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForCollationDDL(); EnsureSequentialMode(OBJECT_COLLATION);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
@ -328,7 +327,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString,
/* deparse sql*/ /* deparse sql*/
char *renameStmtSql = DeparseTreeNode((Node *) stmt); char *renameStmtSql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForCollationDDL(); EnsureSequentialMode(OBJECT_COLLATION);
/* to prevent recursion with mx we disable ddl propagation */ /* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
@ -363,7 +362,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString,
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
char *sql = DeparseTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForCollationDDL(); EnsureSequentialMode(OBJECT_COLLATION);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql, (void *) sql,
@ -453,47 +452,6 @@ AlterCollationSchemaStmtObjectAddress(Node *node, bool missing_ok)
} }
/*
* EnsureSequentialModeForCollationDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*
* As collations are node scoped objects there exists only 1 instance of the collation used by
* potentially multiple shards. To make sure all shards in the transaction can interact
* with the type the type needs to be visible on all connections used by the transaction,
* meaning we can only use 1 connection per node.
*/
static void
EnsureSequentialModeForCollationDDL(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create or modify collation because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating or altering a collation, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("Collation is created or altered. To make sure subsequent "
"commands see the collation correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/* /*
* GenerateBackupNameForCollationCollision generates a new collation name for an existing collation. * GenerateBackupNameForCollationCollision generates a new collation name for an existing collation.
* The name is generated in such a way that the new name doesn't overlap with an existing collation * The name is generated in such a way that the new name doesn't overlap with an existing collation

View File

@ -60,6 +60,7 @@
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -327,6 +328,7 @@ create_reference_table(PG_FUNCTION_ARGS)
* - we are on the coordinator * - we are on the coordinator
* - the current user is the owner of the table * - the current user is the owner of the table
* - relation kind is supported * - relation kind is supported
* - relation is not a shard
*/ */
static void static void
EnsureCitusTableCanBeCreated(Oid relationOid) EnsureCitusTableCanBeCreated(Oid relationOid)
@ -343,6 +345,14 @@ EnsureCitusTableCanBeCreated(Oid relationOid)
* will be performed in CreateDistributedTable. * will be performed in CreateDistributedTable.
*/ */
EnsureRelationKindSupported(relationOid); EnsureRelationKindSupported(relationOid);
/*
* When coordinator is added to the metadata, or on the workers,
* some of the relations of the coordinator node may/will be shards.
* We disallow creating distributed tables from shard relations, by
* erroring out here.
*/
ErrorIfRelationIsAKnownShard(relationOid);
} }

View File

@ -29,7 +29,6 @@
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
static void EnsureSequentialModeForDatabaseDDL(void);
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
static Oid get_database_owner(Oid db_oid); static Oid get_database_owner(Oid db_oid);
@ -66,7 +65,7 @@ PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString,
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForDatabaseDDL(); EnsureSequentialMode(OBJECT_DATABASE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
@ -177,39 +176,3 @@ get_database_owner(Oid db_oid)
return dba; return dba;
} }
/*
* EnsureSequentialModeForDatabaseDDL makes sure that the current transaction is already
* in sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*/
static void
EnsureSequentialModeForDatabaseDDL(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create or modify database because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating or altering a database, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("Database is created or altered. To make sure subsequent "
"commands see the type correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}

View File

@ -37,7 +37,6 @@ static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt);
static List * FilterDistributedExtensions(List *extensionObjectList); static List * FilterDistributedExtensions(List *extensionObjectList);
static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
static void MarkExistingObjectDependenciesDistributedIfSupported(void); static void MarkExistingObjectDependenciesDistributedIfSupported(void);
static void EnsureSequentialModeForExtensionDDL(void);
static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool ShouldPropagateExtensionCommand(Node *parseTree);
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
static Node * RecreateExtensionStmt(Oid extensionOid); static Node * RecreateExtensionStmt(Oid extensionOid);
@ -163,7 +162,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString)
* Make sure that the current transaction is already in sequential mode, * Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode * or can still safely be put in sequential mode
*/ */
EnsureSequentialModeForExtensionDDL(); EnsureSequentialMode(OBJECT_EXTENSION);
/* /*
* Here we append "schema" field to the "options" list (if not specified) * Here we append "schema" field to the "options" list (if not specified)
@ -274,7 +273,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString,
* Make sure that the current transaction is already in sequential mode, * Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode * or can still safely be put in sequential mode
*/ */
EnsureSequentialModeForExtensionDDL(); EnsureSequentialMode(OBJECT_EXTENSION);
List *distributedExtensionAddresses = ExtensionNameListToObjectAddressList( List *distributedExtensionAddresses = ExtensionNameListToObjectAddressList(
distributedExtensions); distributedExtensions);
@ -409,7 +408,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString,
* Make sure that the current transaction is already in sequential mode, * Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode * or can still safely be put in sequential mode
*/ */
EnsureSequentialModeForExtensionDDL(); EnsureSequentialMode(OBJECT_EXTENSION);
const char *alterExtensionStmtSql = DeparseTreeNode(node); const char *alterExtensionStmtSql = DeparseTreeNode(node);
@ -478,7 +477,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString,
* Make sure that the current transaction is already in sequential mode, * Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode * or can still safely be put in sequential mode
*/ */
EnsureSequentialModeForExtensionDDL(); EnsureSequentialMode(OBJECT_EXTENSION);
const char *alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt); const char *alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt);
@ -603,44 +602,6 @@ PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString,
} }
/*
* EnsureSequentialModeForExtensionDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*
* As extensions are node scoped objects there exists only 1 instance of the
* extension used by potentially multiple shards. To make sure all shards in
* the transaction can interact with the extension the extension needs to be
* visible on all connections used by the transaction, meaning we can only use
* 1 connection per node.
*/
static void
EnsureSequentialModeForExtensionDDL(void)
{
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot run extension command because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail(
"When running command on/for a distributed extension, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"A command for a distributed extension is run. To make sure subsequent "
"commands see the type correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/* /*
* ShouldPropagateExtensionCommand determines whether to propagate an extension * ShouldPropagateExtensionCommand determines whether to propagate an extension
* command to the worker nodes. * command to the worker nodes.

View File

@ -79,7 +79,6 @@ static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid
static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid
distributionColumnType, Oid distributionColumnType, Oid
sourceRelationId); sourceRelationId);
static void EnsureSequentialModeForFunctionDDL(void);
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static bool ShouldAddFunctionSignature(FunctionParameterMode mode); static bool ShouldAddFunctionSignature(FunctionParameterMode mode);
@ -211,7 +210,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
* when we allow propagation within a transaction block we should make sure * when we allow propagation within a transaction block we should make sure
* to only allow this in sequential mode. * to only allow this in sequential mode.
*/ */
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
EnsureDependenciesExistOnAllNodes(&functionAddress); EnsureDependenciesExistOnAllNodes(&functionAddress);
@ -1173,48 +1172,6 @@ GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace)
} }
/*
* EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*
* As functions are node scoped objects there exists only 1 instance of the function used by
* potentially multiple shards. To make sure all shards in the transaction can interact
* with the function the function needs to be visible on all connections used by the transaction,
* meaning we can only use 1 connection per node.
*/
static void
EnsureSequentialModeForFunctionDDL(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create function because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating a distributed function, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"A distributed function is created. To make sure subsequent "
"commands see the type correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/* /*
* ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION
* statement. * statement.
@ -1298,7 +1255,7 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString,
EnsureCoordinator(); EnsureCoordinator();
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
/* /*
* ddl jobs will be generated during the postprocessing phase as we need the function to * ddl jobs will be generated during the postprocessing phase as we need the function to
@ -1483,7 +1440,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString,
EnsureCoordinator(); EnsureCoordinator();
ErrorIfUnsupportedAlterFunctionStmt(stmt); ErrorIfUnsupportedAlterFunctionStmt(stmt);
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
@ -1517,7 +1474,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString,
} }
EnsureCoordinator(); EnsureCoordinator();
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
@ -1549,7 +1506,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString,
} }
EnsureCoordinator(); EnsureCoordinator();
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
@ -1582,7 +1539,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString,
} }
EnsureCoordinator(); EnsureCoordinator();
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
@ -1672,7 +1629,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString,
* types, so we block the call. * types, so we block the call.
*/ */
EnsureCoordinator(); EnsureCoordinator();
EnsureSequentialModeForFunctionDDL(); EnsureSequentialMode(OBJECT_FUNCTION);
/* remove the entries for the distributed objects on dropping */ /* remove the entries for the distributed objects on dropping */
ObjectAddress *address = NULL; ObjectAddress *address = NULL;

View File

@ -42,7 +42,6 @@
static ObjectAddress GetObjectAddressBySchemaName(char *schemaName, bool missing_ok); static ObjectAddress GetObjectAddressBySchemaName(char *schemaName, bool missing_ok);
static List * FilterDistributedSchemas(List *schemas); static List * FilterDistributedSchemas(List *schemas);
static void EnsureSequentialModeForSchemaDDL(void);
static bool SchemaHasDistributedTableWithFKey(char *schemaName); static bool SchemaHasDistributedTableWithFKey(char *schemaName);
static bool ShouldPropagateCreateSchemaStmt(void); static bool ShouldPropagateCreateSchemaStmt(void);
@ -62,7 +61,7 @@ PreprocessCreateSchemaStmt(Node *node, const char *queryString,
EnsureCoordinator(); EnsureCoordinator();
EnsureSequentialModeForSchemaDDL(); EnsureSequentialMode(OBJECT_SCHEMA);
/* deparse sql*/ /* deparse sql*/
const char *sql = DeparseTreeNode(node); const char *sql = DeparseTreeNode(node);
@ -101,7 +100,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
EnsureSequentialModeForSchemaDDL(); EnsureSequentialMode(OBJECT_SCHEMA);
Value *schemaVal = NULL; Value *schemaVal = NULL;
foreach_ptr(schemaVal, distributedSchemas) foreach_ptr(schemaVal, distributedSchemas)
@ -204,7 +203,7 @@ PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString,
/* deparse sql*/ /* deparse sql*/
const char *renameStmtSql = DeparseTreeNode(node); const char *renameStmtSql = DeparseTreeNode(node);
EnsureSequentialModeForSchemaDDL(); EnsureSequentialMode(OBJECT_SCHEMA);
/* to prevent recursion with mx we disable ddl propagation */ /* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
@ -293,44 +292,6 @@ FilterDistributedSchemas(List *schemas)
} }
/*
* EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*
* Copy-pasted from type.c
*/
static void
EnsureSequentialModeForSchemaDDL(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create or modify schema because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating, altering, or dropping a schema, Citus "
"needs to perform all operations over a single "
"connection per node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("Schema is created or altered. To make sure subsequent "
"commands see the schema correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/* /*
* SchemaHasDistributedTableWithFKey takes a schema name and scans the relations within * SchemaHasDistributedTableWithFKey takes a schema name and scans the relations within
* that schema. If any one of the relations has a foreign key relationship, it returns * that schema. If any one of the relations has a foreign key relationship, it returns

View File

@ -92,7 +92,6 @@ bool EnableCreateTypePropagation = true;
static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok); static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok);
static List * TypeNameListToObjectAddresses(List *objects); static List * TypeNameListToObjectAddresses(List *objects);
static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation); static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation);
static void EnsureSequentialModeForTypeDDL(void);
static Oid GetTypeOwner(Oid typeOid); static Oid GetTypeOwner(Oid typeOid);
/* recreate functions */ /* recreate functions */
@ -158,7 +157,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString,
* when we allow propagation within a transaction block we should make sure to only * when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode * allow this in sequential mode
*/ */
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) compositeTypeStmtSql, (void *) compositeTypeStmtSql,
@ -223,7 +222,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString,
* regardless if in a transaction or not. If we would not propagate the alter * regardless if in a transaction or not. If we would not propagate the alter
* statement the types would be different on worker and coordinator. * statement the types would be different on worker and coordinator.
*/ */
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterTypeStmtSql, (void *) alterTypeStmtSql,
@ -266,7 +265,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString,
* when we allow propagation within a transaction block we should make sure to only * when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode * allow this in sequential mode
*/ */
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
/* to prevent recursion with mx we disable ddl propagation */ /* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
@ -325,7 +324,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString,
* (adding values to an enum can not run in a transaction anyway and would error by * (adding values to an enum can not run in a transaction anyway and would error by
* postgres already). * postgres already).
*/ */
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
/* /*
* managing types can only be done on the coordinator if ddl propagation is on. when * managing types can only be done on the coordinator if ddl propagation is on. when
@ -405,7 +404,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString,
char *dropStmtSql = DeparseTreeNode((Node *) stmt); char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = oldTypes; stmt->objects = oldTypes;
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
/* to prevent recursion with mx we disable ddl propagation */ /* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
@ -442,7 +441,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString,
/* deparse sql*/ /* deparse sql*/
const char *renameStmtSql = DeparseTreeNode(node); const char *renameStmtSql = DeparseTreeNode(node);
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
/* to prevent recursion with mx we disable ddl propagation */ /* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
@ -480,7 +479,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString,
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
@ -513,7 +512,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString,
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql, (void *) sql,
@ -572,7 +571,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString,
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialModeForTypeDDL(); EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
@ -1130,47 +1129,6 @@ MakeTypeNameFromRangeVar(const RangeVar *relation)
} }
/*
* EnsureSequentialModeForTypeDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*
* As types are node scoped objects there exists only 1 instance of the type used by
* potentially multiple shards. To make sure all shards in the transaction can interact
* with the type the type needs to be visible on all connections used by the transaction,
* meaning we can only use 1 connection per node.
*/
static void
EnsureSequentialModeForTypeDDL(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create or modify type because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating or altering a type, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("Type is created or altered. To make sure subsequent "
"commands see the type correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/* /*
* ShouldPropagateTypeCreate returns if we should propagate the creation of a type. * ShouldPropagateTypeCreate returns if we should propagate the creation of a type.
* *

View File

@ -32,6 +32,7 @@
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -81,6 +82,7 @@ int ExecutorLevel = 0;
/* local function forward declarations */ /* local function forward declarations */
static Relation StubRelation(TupleDesc tupleDescriptor); static Relation StubRelation(TupleDesc tupleDescriptor);
static char * GetObjectTypeString(ObjectType objType);
static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static bool AlterTableConstraintCheck(QueryDesc *queryDesc);
static List * FindCitusCustomScanStates(PlanState *planState); static List * FindCitusCustomScanStates(PlanState *planState);
static bool CitusCustomScanStateWalker(PlanState *planState, static bool CitusCustomScanStateWalker(PlanState *planState,
@ -691,6 +693,98 @@ SetLocalMultiShardModifyModeToSequential()
} }
/*
* EnsureSequentialMode makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginning.
*
* Takes an ObjectType to use in the error/debug messages.
*/
void
EnsureSequentialMode(ObjectType objType)
{
char *objTypeString = GetObjectTypeString(objType);
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot run %s command because there was a "
"parallel operation on a distributed table in the "
"transaction", objTypeString),
errdetail("When running command on/for a distributed %s, Citus "
"needs to perform all operations over a single "
"connection per node to ensure consistency.",
objTypeString),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"A command for a distributed %s is run. To make sure subsequent "
"commands see the %s correctly we need to make sure to "
"use only one connection for all future commands",
objTypeString, objTypeString)));
SetLocalMultiShardModifyModeToSequential();
}
/*
* GetObjectTypeString takes an ObjectType and returns the string version of it.
* We (for now) call this function only in EnsureSequentialMode, and use the returned
* string to generate error/debug messages.
*
* If GetObjectTypeString gets called with an ObjectType that is not in the switch
* statement, the function will return the string "object", and emit a debug message.
* In that case, make sure you've added the newly supported type to the switch statement.
*/
static char *
GetObjectTypeString(ObjectType objType)
{
switch (objType)
{
case OBJECT_COLLATION:
{
return "collation";
}
case OBJECT_DATABASE:
{
return "database";
}
case OBJECT_EXTENSION:
{
return "extension";
}
case OBJECT_FUNCTION:
{
return "function";
}
case OBJECT_SCHEMA:
{
return "schema";
}
case OBJECT_TYPE:
{
return "type";
}
default:
{
ereport(DEBUG1, (errmsg("unsupported object type"),
errdetail("Please add string conversion for the object.")));
return "object";
}
}
}
/* /*
* AlterTableConstraintCheck returns if the given query is an ALTER TABLE * AlterTableConstraintCheck returns if the given query is an ALTER TABLE
* constraint check query. * constraint check query.

View File

@ -513,6 +513,9 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
* are already given away. * are already given away.
*/ */
DeallocateReservedConnections(); DeallocateReservedConnections();
/* we don't want any monitoring view/udf to show already exited backends */
UnSetGlobalPID();
} }

View File

@ -90,7 +90,6 @@ static BackendData *MyBackendData = NULL;
static void BackendManagementShmemInit(void); static void BackendManagementShmemInit(void);
static size_t BackendManagementShmemSize(void); static size_t BackendManagementShmemSize(void);
static void UnSetGlobalPID(void);
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
@ -674,7 +673,7 @@ UnSetDistributedTransactionId(void)
/* /*
* UnSetGlobalPID resets the global pid for the current backend. * UnSetGlobalPID resets the global pid for the current backend.
*/ */
static void void
UnSetGlobalPID(void) UnSetGlobalPID(void)
{ {
/* backend does not exist if the extension is not created */ /* backend does not exist if the extension is not created */

View File

@ -62,6 +62,7 @@ extern void InitializeBackendData(void);
extern void LockBackendSharedMemory(LWLockMode lockMode); extern void LockBackendSharedMemory(LWLockMode lockMode);
extern void UnlockBackendSharedMemory(void); extern void UnlockBackendSharedMemory(void);
extern void UnSetDistributedTransactionId(void); extern void UnSetDistributedTransactionId(void);
extern void UnSetGlobalPID(void);
extern void AssignDistributedTransactionId(void); extern void AssignDistributedTransactionId(void);
extern void MarkCitusInitiatedCoordinatorBackend(void); extern void MarkCitusInitiatedCoordinatorBackend(void);
extern void AssignGlobalPID(void); extern void AssignGlobalPID(void);

View File

@ -139,6 +139,7 @@ extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params,
extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest); DestReceiver *dest);
extern void SetLocalMultiShardModifyModeToSequential(void); extern void SetLocalMultiShardModifyModeToSequential(void);
extern void EnsureSequentialMode(ObjectType objType);
extern void SetLocalForceMaxQueryParallelization(void); extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState); extern void SortTupleStore(CitusScanState *scanState);
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);

View File

@ -165,8 +165,8 @@ SELECT count(*) FROM t; -- parallel execution;
(1 row) (1 row)
ALTER DATABASE regression OWNER TO database_owner_2; -- should ERROR ALTER DATABASE regression OWNER TO database_owner_2; -- should ERROR
ERROR: cannot create or modify database because there was a parallel operation on a distributed table in the transaction ERROR: cannot run database command because there was a parallel operation on a distributed table in the transaction
DETAIL: When creating or altering a database, Citus needs to perform all operations over a single connection per node to ensure consistency. DETAIL: When running command on/for a distributed database, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK; ROLLBACK;
-- list the owners of the current database on all nodes -- list the owners of the current database on all nodes

View File

@ -1602,6 +1602,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
(1 row) (1 row)
DROP SCHEMA local_dist_join_mixed CASCADE; DROP SCHEMA local_dist_join_mixed CASCADE;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands
NOTICE: drop cascades to 7 other objects NOTICE: drop cascades to 7 other objects
DETAIL: drop cascades to table distributed DETAIL: drop cascades to table distributed
drop cascades to table reference drop cascades to table reference

View File

@ -1376,8 +1376,8 @@ BEGIN;
(1 row) (1 row)
ALTER SCHEMA bar RENAME TO foo; ALTER SCHEMA bar RENAME TO foo;
ERROR: cannot create or modify schema because there was a parallel operation on a distributed table in the transaction ERROR: cannot run schema command because there was a parallel operation on a distributed table in the transaction
DETAIL: When creating, altering, or dropping a schema, Citus needs to perform all operations over a single connection per node to ensure consistency. DETAIL: When running command on/for a distributed schema, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK; ROLLBACK;
BEGIN; BEGIN;

View File

@ -8,6 +8,8 @@
-- =================================================================== -- ===================================================================
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
CREATE SCHEMA non_colocated_subquery; CREATE SCHEMA non_colocated_subquery;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands
SET search_path TO non_colocated_subquery, public; SET search_path TO non_colocated_subquery, public;
-- we don't use the data anyway -- we don't use the data anyway
CREATE TABLE users_table_local AS SELECT * FROM users_table LIMIT 0; CREATE TABLE users_table_local AS SELECT * FROM users_table LIMIT 0;

View File

@ -2011,8 +2011,28 @@ RESET citus.enable_manual_changes_to_shards ;
-- these should work as expected -- these should work as expected
TRUNCATE TABLE test_disabling_drop_and_truncate_102040; TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040; DROP TABLE test_disabling_drop_and_truncate_102040;
RESET citus.shard_replication_factor;
DROP TABLE test_disabling_drop_and_truncate; DROP TABLE test_disabling_drop_and_truncate;
-- test creating distributed or reference tables from shards
CREATE TABLE test_creating_distributed_relation_table_from_shard (a int);
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- these should error because shards cannot be used to:
-- create distributed table
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a');
ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation
-- create reference table
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044');
ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation
RESET citus.shard_replication_factor;
DROP TABLE test_creating_distributed_relation_table_from_shard;
-- lets flush the copy often to make sure everyhing is fine -- lets flush the copy often to make sure everyhing is fine
SET citus.local_copy_flush_threshold TO 1; SET citus.local_copy_flush_threshold TO 1;
TRUNCATE another_schema_table; TRUNCATE another_schema_table;

View File

@ -31,9 +31,9 @@ SELECT * FROM table_sizes;
name | has_data name | has_data
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_local | f citus_local | f
citus_local_102045 | t citus_local_102049 | t
ref | t ref | t
ref_102044 | t ref_102048 | t
(4 rows) (4 rows)
-- verify that this UDF is noop on Citus local tables -- verify that this UDF is noop on Citus local tables
@ -47,9 +47,9 @@ SELECT * FROM table_sizes;
name | has_data name | has_data
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_local | f citus_local | f
citus_local_102045 | t citus_local_102049 | t
ref | t ref | t
ref_102044 | t ref_102048 | t
(4 rows) (4 rows)
-- test that we allow cascading truncates to citus local tables -- test that we allow cascading truncates to citus local tables
@ -65,9 +65,9 @@ SELECT * FROM table_sizes;
name | has_data name | has_data
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_local | f citus_local | f
citus_local_102045 | t citus_local_102049 | t
ref | f ref | f
ref_102044 | t ref_102048 | t
(4 rows) (4 rows)
ROLLBACK; ROLLBACK;
@ -98,14 +98,14 @@ SELECT * FROM table_sizes;
name | has_data name | has_data
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_local | f citus_local | f
citus_local_102045 | t citus_local_102049 | t
dist | f dist | f
dist_102047 | t dist_102051 | t
dist_102048 | t dist_102052 | t
dist_102049 | t dist_102053 | t
dist_102050 | t dist_102054 | t
ref | f ref | f
ref_102044 | t ref_102048 | t
(9 rows) (9 rows)
ROLLBACK; ROLLBACK;
@ -121,14 +121,14 @@ SELECT * FROM table_sizes;
name | has_data name | has_data
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_local | f citus_local | f
citus_local_102045 | t citus_local_102049 | t
dist | f dist | f
dist_102047 | t dist_102051 | t
dist_102048 | t dist_102052 | t
dist_102049 | t dist_102053 | t
dist_102050 | t dist_102054 | t
ref | t ref | t
ref_102044 | t ref_102048 | t
(9 rows) (9 rows)
ROLLBACK; ROLLBACK;

View File

@ -1017,9 +1017,22 @@ RESET citus.enable_manual_changes_to_shards ;
TRUNCATE TABLE test_disabling_drop_and_truncate_102040; TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040; DROP TABLE test_disabling_drop_and_truncate_102040;
RESET citus.shard_replication_factor;
DROP TABLE test_disabling_drop_and_truncate; DROP TABLE test_disabling_drop_and_truncate;
-- test creating distributed or reference tables from shards
CREATE TABLE test_creating_distributed_relation_table_from_shard (a int);
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a');
-- these should error because shards cannot be used to:
-- create distributed table
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a');
-- create reference table
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044');
RESET citus.shard_replication_factor;
DROP TABLE test_creating_distributed_relation_table_from_shard;
-- lets flush the copy often to make sure everyhing is fine -- lets flush the copy often to make sure everyhing is fine
SET citus.local_copy_flush_threshold TO 1; SET citus.local_copy_flush_threshold TO 1;
TRUNCATE another_schema_table; TRUNCATE another_schema_table;