Add extensions to distributed object propagation infrastructure (#3185)

pull/3203/head^2
Onur TIRTIR 2019-11-19 17:56:28 +03:00 committed by GitHub
parent 2cb82ae9bd
commit 26c306d188
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 2752 additions and 62 deletions

View File

@ -182,6 +182,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return CreateTypeDDLCommandsIdempotent(dependency);
}
case OCLASS_EXTENSION:
{
return CreateExtensionDDLCommand(dependency);
}
default:
{
break;

View File

@ -11,35 +11,32 @@
#include "postgres.h"
#include "citus_version.h"
#include "catalog/pg_extension_d.h"
#include "commands/extension.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
#include "nodes/parsenodes.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/transaction_management.h"
#include "nodes/makefuncs.h"
#include "utils/lsyscache.h"
#include "utils/builtins.h"
/* Local functions forward declarations for helper functions */
static char * ExtractNewExtensionVersion(Node *parsetree);
/*
* IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER
* EXTENSION statement which references the citus extension. This function
* returns false for all other inputs.
*/
bool
IsCitusExtensionStmt(Node *parsetree)
{
char *extensionName = "";
if (IsA(parsetree, CreateExtensionStmt))
{
extensionName = ((CreateExtensionStmt *) parsetree)->extname;
}
else if (IsA(parsetree, AlterExtensionStmt))
{
extensionName = ((AlterExtensionStmt *) parsetree)->extname;
}
return (strcmp(extensionName, "citus") == 0);
}
static char * ExtractNewExtensionVersion(Node *parseTree);
static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt);
static List * FilterDistributedExtensions(List *extensionObjectList);
static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
static void EnsureSequentialModeForExtensionDDL(void);
static bool ShouldPropagateExtensionCommand(Node *parseTree);
static bool IsDropCitusStmt(Node *parseTree);
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
static Node * RecreateExtensionStmt(Oid extensionOid);
/*
@ -49,9 +46,9 @@ IsCitusExtensionStmt(Node *parsetree)
* out. It ignores the schema version.
*/
void
ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parseTree)
{
char *newExtensionVersion = ExtractNewExtensionVersion(parsetree);
char *newExtensionVersion = ExtractNewExtensionVersion(parseTree);
if (newExtensionVersion != NULL)
{
@ -78,24 +75,24 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
/*
* ExtractNewExtensionVersion returns the new extension version specified by
* a CREATE or ALTER EXTENSION statement. Other inputs are not permitted. This
* function returns NULL for statements with no explicit version specified.
* ExtractNewExtensionVersion returns the palloc'd new extension version specified
* by a CREATE or ALTER EXTENSION statement. Other inputs are not permitted.
* This function returns NULL for statements with no explicit version specified.
*/
static char *
ExtractNewExtensionVersion(Node *parsetree)
ExtractNewExtensionVersion(Node *parseTree)
{
char *newVersion = NULL;
List *optionsList = NIL;
ListCell *optionsCell = NULL;
Value *newVersionValue = NULL;
if (IsA(parsetree, CreateExtensionStmt))
List *optionsList = NIL;
if (IsA(parseTree, CreateExtensionStmt))
{
optionsList = ((CreateExtensionStmt *) parsetree)->options;
optionsList = ((CreateExtensionStmt *) parseTree)->options;
}
else if (IsA(parsetree, AlterExtensionStmt))
else if (IsA(parseTree, AlterExtensionStmt))
{
optionsList = ((AlterExtensionStmt *) parsetree)->options;
optionsList = ((AlterExtensionStmt *) parseTree)->options;
}
else
{
@ -103,15 +100,739 @@ ExtractNewExtensionVersion(Node *parsetree)
Assert(false);
}
foreach(optionsCell, optionsList)
newVersionValue = GetExtensionOption(optionsList, "new_version");
/* return target string safely */
if (newVersionValue)
{
DefElem *defElement = (DefElem *) lfirst(optionsCell);
if (strncmp(defElement->defname, "new_version", NAMEDATALEN) == 0)
const char *newVersion = strVal(newVersionValue);
return pstrdup(newVersion);
}
else
{
return NULL;
}
}
/*
* PlanCreateExtensionStmt is called during the creation of an extension.
* It is executed before the statement is applied locally.
* We decide if the extension needs to be replicated to the worker, and
* if that is the case return a list of DDLJob's that describe how and
* where the extension needs to be created.
*/
List *
PlanCreateExtensionStmt(CreateExtensionStmt *createExtensionStmt, const char *queryString)
{
List *commands = NIL;
const char *createExtensionStmtSql = NULL;
if (!ShouldPropagateExtensionCommand((Node *) createExtensionStmt))
{
return NIL;
}
/*
* If the extension command is a part of a bigger multi-statement transaction,
* do not propagate it
*/
if (IsMultiStatementTransaction())
{
return NIL;
}
/* extension management can only be done via coordinator node */
EnsureCoordinator();
/*
* Make sure that no new nodes are added after this point until the end of the
* transaction by taking a RowShareLock on pg_dist_node, which conflicts with the
* ExclusiveLock taken by master_add_node.
* This guarantees that all active nodes will have the extension, because they will
* either get it now, or get it in master_add_node after this transaction finishes and
* the pg_dist_object record becomes visible.
*/
LockRelationOid(DistNodeRelationId(), RowShareLock);
/*
* Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode
*/
EnsureSequentialModeForExtensionDDL();
/*
* Here we append "schema" field to the "options" list (if not specified)
* to satisfy the schema consistency between worker nodes and the coordinator.
*/
AddSchemaFieldIfMissing(createExtensionStmt);
createExtensionStmtSql = DeparseTreeNode((Node *) createExtensionStmt);
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* AddSchemaFieldIfMissing adds DefElem item for "schema" (if not specified
* in statement) to "options" list before deparsing the statement to satisfy
* the schema consistency between worker nodes and the coordinator.
*/
static void
AddSchemaFieldIfMissing(CreateExtensionStmt *createExtensionStmt)
{
List *optionsList = createExtensionStmt->options;
Value *schemaNameValue = GetExtensionOption(optionsList, "schema");
if (!schemaNameValue)
{
/*
* As we already created the extension by standard_ProcessUtility,
* we actually know the schema it belongs to
*/
bool missingOk = false;
Oid extensionOid = get_extension_oid(createExtensionStmt->extname, missingOk);
Oid extensionSchemaOid = get_extension_schema(extensionOid);
char *extensionSchemaName = get_namespace_name(extensionSchemaOid);
Node *schemaNameArg = (Node *) makeString(extensionSchemaName);
/* set location to -1 as it is unknown */
int location = -1;
DefElem *newDefElement = makeDefElem("schema", schemaNameArg, location);
createExtensionStmt->options = lappend(createExtensionStmt->options,
newDefElement);
}
}
/*
* ProcessCreateExtensionStmt is executed after the extension has been
* created locally and before we create it on the worker nodes.
* As we now have access to ObjectAddress of the extension that is just
* created, we can mark it as distributed to make sure that its
* dependencies exist on all nodes.
*/
void
ProcessCreateExtensionStmt(CreateExtensionStmt *createExtensionStmt, const
char *queryString)
{
const ObjectAddress *extensionAddress = NULL;
if (!ShouldPropagateExtensionCommand((Node *) createExtensionStmt))
{
return;
}
/*
* If the extension command is a part of a bigger multi-statement transaction,
* do not propagate it
*/
if (IsMultiStatementTransaction())
{
return;
}
extensionAddress = GetObjectAddressFromParseTree((Node *) createExtensionStmt, false);
EnsureDependenciesExistsOnAllNodes(extensionAddress);
MarkObjectDistributed(extensionAddress);
}
/*
* PlanDropExtensionStmt is called to drop extension(s) in coordinator and
* in worker nodes if distributed before.
* We first ensure that we keep only the distributed ones before propagating
* the statement to worker nodes.
* If no extensions in the drop list are distributed, then no calls will
* be made to the workers.
*/
List *
PlanDropExtensionStmt(DropStmt *dropStmt, const char *queryString)
{
List *allDroppedExtensions = dropStmt->objects;
List *distributedExtensions = NIL;
List *distributedExtensionAddresses = NIL;
List *commands = NIL;
const char *deparsedStmt = NULL;
ListCell *addressCell = NULL;
if (!ShouldPropagateExtensionCommand((Node *) dropStmt))
{
return NIL;
}
/* get distributed extensions to be dropped in worker nodes as well */
distributedExtensions = FilterDistributedExtensions(allDroppedExtensions);
if (list_length(distributedExtensions) <= 0)
{
/* no distributed extensions to drop */
return NIL;
}
/* extension management can only be done via coordinator node */
EnsureCoordinator();
/*
* Make sure that no new nodes are added after this point until the end of the
* transaction by taking a RowShareLock on pg_dist_node, which conflicts with the
* ExclusiveLock taken by master_add_node.
* This guarantees that all active nodes will drop the extension, because they will
* either get it now, or get it in master_add_node after this transaction finishes and
* the pg_dist_object record becomes visible.
*/
LockRelationOid(DistNodeRelationId(), RowShareLock);
/*
* Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode
*/
EnsureSequentialModeForExtensionDDL();
distributedExtensionAddresses = ExtensionNameListToObjectAddressList(
distributedExtensions);
/* unmark each distributed extension */
foreach(addressCell, distributedExtensionAddresses)
{
ObjectAddress *address = (ObjectAddress *) lfirst(addressCell);
UnmarkObjectDistributed(address);
}
/*
* Temporary swap the lists of objects to delete with the distributed
* objects and deparse to an sql statement for the workers.
* Then switch back to allDroppedExtensions to drop all specified
* extensions in coordinator after PlanDropExtensionStmt completes
* its execution.
*/
dropStmt->objects = distributedExtensions;
deparsedStmt = DeparseTreeNode((Node *) dropStmt);
dropStmt->objects = allDroppedExtensions;
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) deparsedStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* FilterDistributedExtensions returns the distributed objects in an "objects"
* list of a DropStmt, a list having the format of a "DropStmt.objects" list.
* That is, in turn, a list of string "Value"s.
*/
static List *
FilterDistributedExtensions(List *extensionObjectList)
{
List *extensionNameList = NIL;
bool missingOk = true;
ListCell *objectCell = NULL;
foreach(objectCell, extensionObjectList)
{
char *extensionName = strVal(lfirst(objectCell));
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
Oid extensionOid = get_extension_oid(extensionName, missingOk);
if (!OidIsValid(extensionOid))
{
newVersion = strVal(defElement->arg);
break;
continue;
}
ObjectAddressSet(*address, ExtensionRelationId, extensionOid);
if (!IsObjectDistributed(address))
{
continue;
}
extensionNameList = lappend(extensionNameList, makeString(extensionName));
}
return extensionNameList;
}
/*
* ExtensionNameListToObjectAddressList returns the object addresses in
* an ObjectAddress list for an "objects" list of a DropStmt.
* Callers of this function should ensure that all the objects in the list
* are valid and distributed.
*/
static List *
ExtensionNameListToObjectAddressList(List *extensionObjectList)
{
List *extensionObjectAddressList = NIL;
ListCell *objectCell = NULL;
foreach(objectCell, extensionObjectList)
{
/*
* We set missingOk to false as we assume all the objects in
* extensionObjectList list are valid and distributed.
*/
bool missingOk = false;
const char *extensionName = strVal(lfirst(objectCell));
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
Oid extensionOid = get_extension_oid(extensionName, missingOk);
ObjectAddressSet(*address, ExtensionRelationId, extensionOid);
extensionObjectAddressList = lappend(extensionObjectAddressList, address);
}
return extensionObjectAddressList;
}
/*
* PlanAlterExtensionSchemaStmt is invoked for alter extension set schema statements.
*/
List *
PlanAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt, const
char *queryString)
{
const char *alterExtensionStmtSql = NULL;
List *commands = NIL;
if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt))
{
return NIL;
}
/* extension management can only be done via coordinator node */
EnsureCoordinator();
/*
* Make sure that no new nodes are added after this point until the end of the
* transaction by taking a RowShareLock on pg_dist_node, which conflicts with the
* ExclusiveLock taken by master_add_node.
* This guarantees that all active nodes will update the extension schema after
* this transaction finishes and the pg_dist_object record becomes visible.
*/
LockRelationOid(DistNodeRelationId(), RowShareLock);
/*
* Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode
*/
EnsureSequentialModeForExtensionDDL();
alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt);
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* ProcessAlterExtensionSchemaStmt is executed after the change has been applied
* locally, we can now use the new dependencies (schema) of the extension to ensure
* all its dependencies exist on the workers before we apply the commands remotely.
*/
void
ProcessAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt, const
char *queryString)
{
const ObjectAddress *extensionAddress = NULL;
extensionAddress = GetObjectAddressFromParseTree((Node *) alterExtensionStmt, false);
if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt))
{
return;
}
/* dependencies (schema) have changed let's ensure they exist */
EnsureDependenciesExistsOnAllNodes(extensionAddress);
}
/*
* PlanAlterExtensionUpdateStmt is invoked for alter extension update statements.
*/
List *
PlanAlterExtensionUpdateStmt(AlterExtensionStmt *alterExtensionStmt, const
char *queryString)
{
const char *alterExtensionStmtSql = NULL;
List *commands = NIL;
if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt))
{
return NIL;
}
/* extension management can only be done via coordinator node */
EnsureCoordinator();
/*
* Make sure that no new nodes are added after this point until the end of the
* transaction by taking a RowShareLock on pg_dist_node, which conflicts with the
* ExclusiveLock taken by master_add_node.
* This guarantees that all active nodes will update the extension version, because
* they will either get it now, or get it in master_add_node after this transaction
* finishes and the pg_dist_object record becomes visible.
*/
LockRelationOid(DistNodeRelationId(), RowShareLock);
/*
* Make sure that the current transaction is already in sequential mode,
* or can still safely be put in sequential mode
*/
EnsureSequentialModeForExtensionDDL();
alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt);
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterExtensionStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* EnsureSequentialModeForExtensionDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginnig.
*
* As extensions are node scoped objects there exists only 1 instance of the
* extension used by potentially multiple shards. To make sure all shards in
* the transaction can interact with the extension the extension needs to be
* visible on all connections used by the transaction, meaning we can only use
* 1 connection per node.
*/
static void
EnsureSequentialModeForExtensionDDL(void)
{
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot run extension command because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail(
"When running command on/for a distributed extension, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"A command for a distributed extension is run. To make sure subsequent "
"commands see the type correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/*
* ShouldPropagateExtensionCommand determines whether to propagate an extension
* command to the worker nodes.
*/
static bool
ShouldPropagateExtensionCommand(Node *parseTree)
{
/* if we disabled object propagation, then we should not propagate anything. */
if (!EnableDependencyCreation)
{
return false;
}
/*
* If extension command is run for/on citus, leave the rest to standard utility hook
* by returning false.
*/
if (IsCreateAlterExtensionUpdateCitusStmt(parseTree))
{
return false;
}
else if (IsDropCitusStmt(parseTree))
{
return false;
}
else if (IsAlterExtensionSetSchemaCitus(parseTree))
{
return false;
}
return true;
}
/*
* IsCreateAlterExtensionUpdateCitusStmt returns whether a given utility is a
* CREATE or ALTER EXTENSION UPDATE statement which references the citus extension.
* This function returns false for all other inputs.
*/
bool
IsCreateAlterExtensionUpdateCitusStmt(Node *parseTree)
{
const char *extensionName = NULL;
if (IsA(parseTree, CreateExtensionStmt))
{
extensionName = ((CreateExtensionStmt *) parseTree)->extname;
}
else if (IsA(parseTree, AlterExtensionStmt))
{
extensionName = ((AlterExtensionStmt *) parseTree)->extname;
}
else
{
/*
* If it is not a Create Extension or a Alter Extension stmt,
* it does not matter if the it is about citus
*/
return false;
}
/*
* Now that we have CreateExtensionStmt or AlterExtensionStmt,
* check if it is run for/on citus
*/
return (strncasecmp(extensionName, CITUS_NAME, NAMEDATALEN) == 0);
}
/*
* IsDropCitusStmt iterates the objects to be dropped in a drop statement
* and try to find citus there.
*/
static bool
IsDropCitusStmt(Node *parseTree)
{
ListCell *objectCell = NULL;
/* if it is not a DropStmt, it is needless to search for citus */
if (!IsA(parseTree, DropStmt))
{
return false;
}
/* now that we have a DropStmt, check if citus is among the objects to dropped */
foreach(objectCell, ((DropStmt *) parseTree)->objects)
{
const char *extensionName = strVal(lfirst(objectCell));
if (strncasecmp(extensionName, CITUS_NAME, NAMEDATALEN) == 0)
{
return true;
}
}
return newVersion;
return false;
}
/*
* IsAlterExtensionSetSchemaCitus returns whether a given utility is an
* ALTER EXTENSION SET SCHEMA statement which references the citus extension.
* This function returns false for all other inputs.
*/
static bool
IsAlterExtensionSetSchemaCitus(Node *parseTree)
{
const char *extensionName = NULL;
if (IsA(parseTree, AlterObjectSchemaStmt))
{
AlterObjectSchemaStmt *alterExtensionSetSchemaStmt =
(AlterObjectSchemaStmt *) parseTree;
if (alterExtensionSetSchemaStmt->objectType == OBJECT_EXTENSION)
{
extensionName = strVal(((AlterObjectSchemaStmt *) parseTree)->object);
/*
* Now that we have AlterObjectSchemaStmt for an extension,
* check if it is run for/on citus
*/
return (strncasecmp(extensionName, CITUS_NAME, NAMEDATALEN) == 0);
}
}
return false;
}
/*
* CreateExtensionDDLCommand returns a list of DDL statements (const char *) to be
* executed on a node to recreate the extension addressed by the extensionAddress.
*/
List *
CreateExtensionDDLCommand(const ObjectAddress *extensionAddress)
{
List *ddlCommands = NIL;
const char *ddlCommand = NULL;
Node *stmt = NULL;
/* generate a statement for creation of the extension in "if not exists" construct */
stmt = RecreateExtensionStmt(extensionAddress->objectId);
/* capture ddl command for the create statement */
ddlCommand = DeparseTreeNode(stmt);
ddlCommands = list_make1((void *) ddlCommand);
return ddlCommands;
}
/*
* RecreateEnumStmt returns a parsetree for a CREATE EXTENSION statement that would
* recreate the given extension on a new node.
*/
static Node *
RecreateExtensionStmt(Oid extensionOid)
{
CreateExtensionStmt *createExtensionStmt = makeNode(CreateExtensionStmt);
char *extensionName = get_extension_name(extensionOid);
if (!extensionName)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("extension with oid %u does not exist",
extensionOid)));
}
/* schema DefElement related variables */
Oid extensionSchemaOid = InvalidOid;
char *extensionSchemaName = NULL;
Node *schemaNameArg = NULL;
/* set location to -1 as it is unknown */
int location = -1;
DefElem *schemaDefElement = NULL;
/* set extension name and if_not_exists fields */
createExtensionStmt->extname = extensionName;
createExtensionStmt->if_not_exists = true;
/* get schema name that extension was created on */
extensionSchemaOid = get_extension_schema(extensionOid);
extensionSchemaName = get_namespace_name(extensionSchemaOid);
/* make DefEleme for extensionSchemaName */
schemaNameArg = (Node *) makeString(extensionSchemaName);
schemaDefElement = makeDefElem("schema", schemaNameArg, location);
/* append the schema name DefElem finally */
createExtensionStmt->options = lappend(createExtensionStmt->options,
schemaDefElement);
return (Node *) createExtensionStmt;
}
/*
* AlterExtensionSchemaStmtObjectAddress returns the ObjectAddress of the extension that is
* the subject of the AlterObjectSchemaStmt. Errors if missing_ok is false.
*/
const ObjectAddress *
AlterExtensionSchemaStmtObjectAddress(AlterObjectSchemaStmt *alterExtensionSchemaStmt,
bool missing_ok)
{
ObjectAddress *extensionAddress = NULL;
Oid extensionOid = InvalidOid;
const char *extensionName = NULL;
Assert(alterExtensionSchemaStmt->objectType == OBJECT_EXTENSION);
extensionName = strVal(alterExtensionSchemaStmt->object);
extensionOid = get_extension_oid(extensionName, missing_ok);
if (extensionOid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("extension \"%s\" does not exist",
extensionName)));
}
extensionAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*extensionAddress, ExtensionRelationId, extensionOid);
return extensionAddress;
}
/*
* AlterExtensionUpdateStmtObjectAddress returns the ObjectAddress of the extension that is
* the subject of the AlterExtensionStmt. Errors if missing_ok is false.
*/
const ObjectAddress *
AlterExtensionUpdateStmtObjectAddress(AlterExtensionStmt *alterExtensionStmt,
bool missing_ok)
{
ObjectAddress *extensionAddress = NULL;
Oid extensionOid = InvalidOid;
const char *extensionName = NULL;
extensionName = alterExtensionStmt->extname;
extensionOid = get_extension_oid(extensionName, missing_ok);
if (extensionOid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("extension \"%s\" does not exist",
extensionName)));
}
extensionAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*extensionAddress, ExtensionRelationId, extensionOid);
return extensionAddress;
}

View File

@ -128,6 +128,11 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString)
return PlanAlterFunctionSchemaStmt(stmt, queryString);
}
case OBJECT_EXTENSION:
{
return PlanAlterExtensionSchemaStmt(stmt, queryString);
}
default:
{
/* do nothing for unsupported objects */
@ -205,6 +210,12 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin
return;
}
case OBJECT_EXTENSION:
{
ProcessAlterExtensionSchemaStmt(stmt, queryString);
return;
}
default:
{
/* do nothing for unsupported objects */

View File

@ -117,7 +117,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
{
Node *parsetree = pstmt->utilityStmt;
List *ddlJobs = NIL;
bool checkExtensionVersion = false;
bool checkCreateAlterExtensionVersion = false;
if (IsA(parsetree, TransactionStmt) ||
IsA(parsetree, LockStmt) ||
@ -143,13 +143,12 @@ multi_ProcessUtility(PlannedStmt *pstmt,
return;
}
checkExtensionVersion = IsCitusExtensionStmt(parsetree);
if (EnableVersionChecks && checkExtensionVersion)
checkCreateAlterExtensionVersion = IsCreateAlterExtensionUpdateCitusStmt(parsetree);
if (EnableVersionChecks && checkCreateAlterExtensionVersion)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
if (!CitusHasBeenLoaded())
{
/*
@ -430,6 +429,13 @@ multi_ProcessUtility(PlannedStmt *pstmt,
case OBJECT_FUNCTION:
{
ddlJobs = PlanDropFunctionStmt(dropStatement, queryString);
break;
}
case OBJECT_EXTENSION:
{
ddlJobs = PlanDropExtensionStmt(dropStatement, queryString);
break;
}
default:
@ -565,6 +571,20 @@ multi_ProcessUtility(PlannedStmt *pstmt,
castNode(AlterObjectDependsStmt, parsetree), queryString);
}
if (IsA(parsetree, AlterExtensionStmt))
{
ddlJobs = PlanAlterExtensionUpdateStmt(castNode(AlterExtensionStmt,
parsetree), queryString);
}
if (IsA(parsetree, AlterExtensionContentsStmt))
{
ereport(NOTICE, (errmsg(
"Citus does not propagate adding/dropping member objects"),
errhint(
"You can add/drop the member objects on the workers as well.")));
}
/*
* ALTER TABLE ALL IN TABLESPACE statements have their node type as
* AlterTableMoveAllStmt. At the moment we do not support this functionality in
@ -795,6 +815,20 @@ multi_ProcessUtility(PlannedStmt *pstmt,
ProcessAlterTableStmtAttachPartition(alterTableStatement);
}
/*
* We call PlanCreateExtensionStmt and ProcessCreateExtensionStmt after standard_ProcessUtility
* does its work to learn the schema that the extension belongs to (if statement does not include
* WITH SCHEMA clause)
*/
if (EnableDDLPropagation && IsA(parsetree, CreateExtensionStmt))
{
CreateExtensionStmt *createExtensionStmt = castNode(CreateExtensionStmt,
parsetree);
ddlJobs = PlanCreateExtensionStmt(createExtensionStmt, queryString);
ProcessCreateExtensionStmt(createExtensionStmt, queryString);
}
/* don't run post-process code for local commands */
if (ddlJobs != NIL)
{

View File

@ -37,6 +37,10 @@ static const char * DeparseAlterObjectDependsStmt(AlterObjectDependsStmt *stmt);
*
* - ALTER FUNCTION, ALTER PROCEDURE, ALTER AGGREGATE
* - DROP FUNCTION, DROP PROCEDURE, DROP AGGREGATE
*
* - CREATE EXTENSION
* - ALTER EXTENSION
* - DROP EXTENSION
*/
const char *
DeparseTreeNode(Node *stmt)
@ -98,6 +102,16 @@ DeparseTreeNode(Node *stmt)
return DeparseAlterRoleStmt(castNode(AlterRoleStmt, stmt));
}
case T_CreateExtensionStmt:
{
return DeparseCreateExtensionStmt(castNode(CreateExtensionStmt, stmt));
}
case T_AlterExtensionStmt:
{
return DeparseAlterExtensionStmt(castNode(AlterExtensionStmt, stmt));
}
default:
{
ereport(ERROR, (errmsg("unsupported statement for deparsing")));
@ -129,6 +143,11 @@ DeparseDropStmt(DropStmt *stmt)
return DeparseDropFunctionStmt(stmt);
}
case OBJECT_EXTENSION:
{
return DeparseDropExtensionStmt(stmt);
}
default:
{
ereport(ERROR, (errmsg("unsupported drop statement for deparsing")));
@ -250,6 +269,11 @@ DeparseAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
return DeparseAlterFunctionSchemaStmt(stmt);
}
case OBJECT_EXTENSION:
{
return DeparseAlterExtensionSchemaStmt(stmt);
}
default:
{
ereport(ERROR, (errmsg("unsupported rename statement for deparsing")));

View File

@ -0,0 +1,280 @@
/*-------------------------------------------------------------------------
*
* deparse_extension_stmts.c
* All routines to deparse extension statements.
* This file contains deparse functions for extension statement deparsing
* as well as related helper functions.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
/* Local functions forward declarations for helper functions */
static void AppendCreateExtensionStmt(StringInfo buf, CreateExtensionStmt *stmt);
static void AppendDropExtensionStmt(StringInfo buf, DropStmt *stmt);
static void AppendExtensionNameList(StringInfo buf, List *objects);
static void AppendAlterExtensionSchemaStmt(StringInfo buf,
AlterObjectSchemaStmt *alterExtensionSchemaStmt);
static void AppendAlterExtensionStmt(StringInfo buf,
AlterExtensionStmt *alterExtensionStmt);
/*
* GetExtensionOption returns Value* of DefElem node with "defname" from "options" list
*/
Value *
GetExtensionOption(List *extensionOptions, const char *defname)
{
Value *targetValue = NULL;
ListCell *defElemCell = NULL;
foreach(defElemCell, extensionOptions)
{
DefElem *defElement = (DefElem *) lfirst(defElemCell);
if (IsA(defElement, DefElem) && strncmp(defElement->defname, defname,
NAMEDATALEN) == 0)
{
targetValue = (Value *) defElement->arg;
break;
}
}
/* return target string safely */
if (targetValue)
{
return targetValue;
}
else
{
return NULL;
}
}
/*
* DeparseCreateExtensionStmt builds and returns a string representing the
* CreateExtensionStmt to be sent to worker nodes.
*/
const char *
DeparseCreateExtensionStmt(CreateExtensionStmt *createExtensionStmt)
{
StringInfoData sql = { 0 };
initStringInfo(&sql);
AppendCreateExtensionStmt(&sql, createExtensionStmt);
return sql.data;
}
/*
* AppendCreateExtensionStmt appends a string representing the CreateExtensionStmt to a buffer
*/
static void
AppendCreateExtensionStmt(StringInfo buf, CreateExtensionStmt *createExtensionStmt)
{
const char *extensionName = createExtensionStmt->extname;
List *optionsList = createExtensionStmt->options;
/*
* We fetch "new_version", "schema" and "cascade" options from
* optionList as we will append "IF NOT EXISTS" clause regardless of
* statement's content before propagating it to worker nodes.
* We also do not care old_version for now.
*/
Value *schemaNameValue = GetExtensionOption(optionsList, "schema");
/* these can be NULL hence check before fetching the stored value */
Value *newVersionValue = GetExtensionOption(optionsList, "new_version");
Value *cascadeValue = GetExtensionOption(optionsList, "cascade");
/*
* We do not check for if schemaName is NULL as we append it in deparse
* logic if it is not specified.
*/
const char *schemaName = strVal(schemaNameValue);
schemaName = quote_identifier(schemaName);
appendStringInfo(buf, "CREATE EXTENSION IF NOT EXISTS %s WITH SCHEMA %s",
extensionName, schemaName);
/* "new_version" may not be specified in CreateExtensionStmt */
if (newVersionValue)
{
const char *newVersion = strVal(newVersionValue);
newVersion = quote_identifier(newVersion);
appendStringInfo(buf, " VERSION %s", newVersion);
}
/* "cascade" may not be specified in CreateExtensionStmt */
if (cascadeValue)
{
bool cascade = intVal(cascadeValue);
if (cascade)
{
appendStringInfoString(buf, " CASCADE");
}
}
appendStringInfoString(buf, " ;");
}
/*
* DeparseAlterExtensionStmt builds and returns a string representing the
* AlterExtensionStmt to be sent to worker nodes.
*/
const char *
DeparseAlterExtensionStmt(AlterExtensionStmt *alterExtensionStmt)
{
StringInfoData sql = { 0 };
initStringInfo(&sql);
AppendAlterExtensionStmt(&sql, alterExtensionStmt);
return sql.data;
}
/*
* AppendAlterExtensionStmt appends a string representing the AlterExtensionStmt to a buffer
*/
static void
AppendAlterExtensionStmt(StringInfo buf, AlterExtensionStmt *alterExtensionStmt)
{
const char *extensionName = alterExtensionStmt->extname;
List *optionsList = alterExtensionStmt->options;
Value *newVersionValue = GetExtensionOption(optionsList, "new_version");
appendStringInfo(buf, "ALTER EXTENSION %s UPDATE ", extensionName);
/* "new_version" may not be specified in AlterExtensionStmt */
if (newVersionValue)
{
const char *newVersion = strVal(newVersionValue);
newVersion = quote_identifier(newVersion);
appendStringInfo(buf, " TO %s", newVersion);
}
appendStringInfoString(buf, ";");
}
/*
* DeparseDropExtensionStmt builds and returns a string representing the DropStmt
*/
const char *
DeparseDropExtensionStmt(DropStmt *dropStmt)
{
StringInfoData str = { 0 };
initStringInfo(&str);
AppendDropExtensionStmt(&str, dropStmt);
return str.data;
}
/*
* AppendDropExtensionStmt appends a string representing the DropStmt for
* an extension to a buffer.
*/
static void
AppendDropExtensionStmt(StringInfo str, DropStmt *dropStmt)
{
/* we append "IF NOT EXISTS" clause regardless of the content of the statement. */
appendStringInfoString(str, "DROP EXTENSION IF EXISTS ");
/*
* Pick the distributed ones from the "objects" list that is storing
* the object names to be deleted.
*/
AppendExtensionNameList(str, dropStmt->objects);
/* depending on behaviour field of DropStmt, we should append CASCADE or RESTRICT */
if (dropStmt->behavior == DROP_CASCADE)
{
appendStringInfoString(str, " CASCADE;");
}
else
{
appendStringInfoString(str, " RESTRICT;");
}
}
/*
* AppendExtensionNameList appends a string representing the list of
* extension names to a buffer.
*/
static void
AppendExtensionNameList(StringInfo str, List *objects)
{
ListCell *objectCell = NULL;
foreach(objectCell, objects)
{
const char *extensionName = strVal(lfirst(objectCell));
if (objectCell != list_head(objects))
{
appendStringInfo(str, ", ");
}
appendStringInfoString(str, extensionName);
}
}
/*
* DeparseAlterExtensionSchemaStmt builds and returns a string representing the
* AlterObjectSchemaStmt (ALTER EXTENSION SET SCHEMA).
*/
const char *
DeparseAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionSchemaStmt)
{
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(alterExtensionSchemaStmt->objectType == OBJECT_EXTENSION);
AppendAlterExtensionSchemaStmt(&str, alterExtensionSchemaStmt);
return str.data;
}
/*
* AppendAlterExtensionSchemaStmt appends a string representing the AlterObjectSchemaStmt
* for an extension to a buffer.
*/
static void
AppendAlterExtensionSchemaStmt(StringInfo buf,
AlterObjectSchemaStmt *alterExtensionSchemaStmt)
{
const char *extensionName = NULL;
Assert(alterExtensionSchemaStmt->objectType == OBJECT_EXTENSION);
extensionName = strVal(alterExtensionSchemaStmt->object);
appendStringInfo(buf, "ALTER EXTENSION %s SET SCHEMA %s;", extensionName,
quote_identifier(alterExtensionSchemaStmt->newschema));
}

View File

@ -12,8 +12,11 @@
#include "postgres.h"
#include "commands/extension.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_extension_d.h"
static const ObjectAddress * AlterTableStmtObjectAddress(AlterTableStmt *stmt,
bool missing_ok);
@ -26,7 +29,11 @@ static const ObjectAddress * AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt,
bool missing_ok);
static const ObjectAddress * AlterObjectDependsStmtObjectAddress(
AlterObjectDependsStmt *stmt, bool missing_ok);
static const ObjectAddress * CreateExtensionStmtObjectAddress(CreateExtensionStmt *stmt,
bool missing_ok);
static const ObjectAddress * AlterExtensionStmtObjectAddress(
AlterExtensionStmt *alterExtensionStmt, bool
missing_ok);
/*
* GetObjectAddressFromParseTree returns the ObjectAdderss of the main target of the parse
@ -109,6 +116,18 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
return NULL;
}
case T_CreateExtensionStmt:
{
return CreateExtensionStmtObjectAddress(castNode(CreateExtensionStmt,
parseTree), missing_ok);
}
case T_AlterExtensionStmt:
{
return AlterExtensionStmtObjectAddress(castNode(AlterExtensionStmt,
parseTree), missing_ok);
}
default:
{
/*
@ -189,6 +208,11 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok)
return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok);
}
case OBJECT_EXTENSION:
{
return AlterExtensionSchemaStmtObjectAddress(stmt, missing_ok);
}
default:
{
ereport(ERROR, (errmsg("unsupported alter schema statement to get object "
@ -273,3 +297,67 @@ AlterObjectDependsStmtObjectAddress(AlterObjectDependsStmt *stmt, bool missing_o
}
}
}
/*
* CreateExtensionStmtObjectAddress finds the ObjectAddress for the extension described
* by the CreateExtensionStmt. If missing_ok is false, then this function throws an
* error if the extension does not exist.
*
* Never returns NULL, but the objid in the address could be invalid if missing_ok was set
* to true.
*/
static const ObjectAddress *
CreateExtensionStmtObjectAddress(CreateExtensionStmt *createExtensionStmt, bool
missing_ok)
{
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
const char *extensionName = createExtensionStmt->extname;
Oid extensionoid = get_extension_oid(extensionName, missing_ok);
/* if we couldn't find the extension, error if missing_ok is false */
if (!missing_ok && extensionoid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("extension \"%s\" does not exist",
extensionName)));
}
ObjectAddressSet(*address, ExtensionRelationId, extensionoid);
return address;
}
/*
* AlterExtensionStmtObjectAddress finds the ObjectAddress for the extension described
* by the AlterExtensionStmt. If missing_ok is false, then this function throws an
* error if the extension is not created before.
*
* Never returns NULL, but the objid in the address could be invalid if missing_ok was set
* to true.
*/
static const ObjectAddress *
AlterExtensionStmtObjectAddress(AlterExtensionStmt *alterExtensionStmt, bool
missing_ok)
{
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
const char *extensionName = alterExtensionStmt->extname;
Oid extensionoid = get_extension_oid(extensionName, missing_ok);
/* if we couldn't find the extension, error if missing_ok is false */
if (!missing_ok && extensionoid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("extension \"%s\" does not exist",
extensionName)));
}
ObjectAddressSet(*address, ExtensionRelationId, extensionoid);
return address;
}

View File

@ -403,6 +403,11 @@ SupportedDependencyByCitus(const ObjectAddress *address)
return true;
}
case OCLASS_EXTENSION:
{
return true;
}
case OCLASS_TYPE:
{
switch (get_typtype(address->objectId))

View File

@ -23,12 +23,30 @@
/* cluster.c - forward declarations */
extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand);
/* call.c */
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest);
/* extension.c - forward declarations */
extern bool IsCitusExtensionStmt(Node *parsetree);
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
extern List * PlanCreateExtensionStmt(CreateExtensionStmt *stmt, const char *queryString);
extern void ProcessCreateExtensionStmt(CreateExtensionStmt *stmt, const
char *queryString);
extern List * PlanDropExtensionStmt(DropStmt *stmt, const char *queryString);
extern List * PlanAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt,
const char *queryString);
extern void ProcessAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt,
const char *queryString);
extern List * PlanAlterExtensionUpdateStmt(AlterExtensionStmt *alterExtensionStmt, const
char *queryString);
extern List * CreateExtensionDDLCommand(const ObjectAddress *extensionAddress);
extern const ObjectAddress * AlterExtensionSchemaStmtObjectAddress(
AlterObjectSchemaStmt *stmt, bool missing_ok);
extern const ObjectAddress * AlterExtensionUpdateStmtObjectAddress(
AlterExtensionStmt *alterExtensionStmt,
bool missing_ok);
/* foreign_constraint.c - forward declarations */

View File

@ -32,6 +32,7 @@ extern void AssertObjectTypeIsFunctional(ObjectType type);
extern void QualifyTreeNode(Node *stmt);
extern const char * DeparseTreeNode(Node *stmt);
/* forward declarations for deparse_type_stmts.c */
extern const char * DeparseCompositeTypeStmt(CompositeTypeStmt *stmt);
extern const char * DeparseCreateEnumStmt(CreateEnumStmt *stmt);
extern const char * DeparseDropTypeStmt(DropStmt *stmt);
@ -69,8 +70,16 @@ extern void QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt);
extern void QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt);
extern void QualifyAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt);
/* forward declarations for deparse_role_stmts.c */
extern const char * DeparseAlterRoleStmt(AlterRoleStmt *stmt);
/* forward declarations for deparse_extension_stmts.c */
extern Value * GetExtensionOption(List *extensionOptions, const
char *defname);
extern const char * DeparseCreateExtensionStmt(CreateExtensionStmt *stmt);
extern const char * DeparseDropExtensionStmt(DropStmt *stmt);
extern const char * DeparseAlterExtensionSchemaStmt(
AlterObjectSchemaStmt *alterExtensionSchemaStmt);
extern const char * DeparseAlterExtensionStmt(AlterExtensionStmt *alterExtensionStmt);
#endif /* CITUS_DEPARSER_H */

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
289 288 f
340 339 f
transactionnumberwaitingtransactionnumbers
288
289 288
339
340 339
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
293 292 f
294 292 f
294 293 t
344 343 f
345 343 f
345 344 t
transactionnumberwaitingtransactionnumbers
292
293 292
294 292,293
343
344 343
345 343,344
step s1-abort:
ABORT;

View File

@ -0,0 +1,709 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-add-node-1 s2-create-extension-version-11 s1-commit s1-print
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-create-extension-version-11:
CREATE extension seg VERSION "1.1";
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-extension-version-11: <... completed>
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
1
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.1 public
run_command_on_workers
(localhost,57637,t,seg)
(localhost,57638,t,seg)
run_command_on_workers
(localhost,57637,t,1.1)
(localhost,57638,t,1.1)
run_command_on_workers
(localhost,57637,t,public)
(localhost,57638,t,public)
master_remove_node
starting permutation: s1-begin s1-add-node-1 s2-alter-extension-update-to-version-12 s1-commit s1-print
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-alter-extension-update-to-version-12:
ALTER extension seg update to "1.2";
<waiting ...>
step s1-commit:
COMMIT;
step s2-alter-extension-update-to-version-12: <... completed>
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
1
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.2 public
run_command_on_workers
(localhost,57637,t,seg)
(localhost,57638,t,seg)
run_command_on_workers
(localhost,57637,t,1.2)
(localhost,57638,t,1.2)
run_command_on_workers
(localhost,57637,t,public)
(localhost,57638,t,public)
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-remove-node-1 s2-drop-extension s1-commit s1-print
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-remove-node-1:
SELECT 1 FROM master_remove_node('localhost', 57637);
?column?
1
step s2-drop-extension:
drop extension seg;
<waiting ...>
step s1-commit:
COMMIT;
step s2-drop-extension: <... completed>
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
master_remove_node
starting permutation: s1-begin s1-add-node-1 s2-create-extension-with-schema1 s1-commit s1-print
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-create-extension-with-schema1:
create schema if not exists schema1;
CREATE extension seg with schema schema1;
step s1-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.3 schema1
run_command_on_workers
(localhost,57637,t,seg)
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,1.2)
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,public)
(localhost,57638,t,"")
master_remove_node
starting permutation: s1-begin s1-add-node-1 s2-drop-extension-cascade s1-commit s1-print
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-drop-extension-cascade:
drop extension seg cascade;
step s1-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
run_command_on_workers
(localhost,57637,t,seg)
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,1.2)
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,public)
(localhost,57638,t,"")
master_remove_node
starting permutation: s1-add-node-1 s1-create-extension-with-schema2 s1-begin s1-remove-node-1 s2-alter-extension-set-schema4 s1-commit s1-print
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-create-extension-with-schema2:
create schema if not exists schema2;
CREATE extension seg with schema schema2;
step s1-begin:
BEGIN;
step s1-remove-node-1:
SELECT 1 FROM master_remove_node('localhost', 57637);
?column?
1
step s2-alter-extension-set-schema4:
CREATE schema if not exists schema4;
alter extension seg set schema schema4;
<waiting ...>
step s1-commit:
COMMIT;
step s2-alter-extension-set-schema4: <... completed>
error in steps s1-commit s2-alter-extension-set-schema4: ERROR: extension "seg" does not exist
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.3 schema2
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-remove-node-1 s2-create-extension-with-schema1 s1-commit s1-print
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-remove-node-1:
SELECT 1 FROM master_remove_node('localhost', 57637);
?column?
1
step s2-create-extension-with-schema1:
create schema if not exists schema1;
CREATE extension seg with schema schema1;
ERROR: extension "seg" already exists
step s1-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.3 schema2
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
master_remove_node
starting permutation: s2-drop-extension s2-begin s2-create-extension-version-11 s1-add-node-1 s2-commit s1-print
?column?
1
step s2-drop-extension:
drop extension seg;
step s2-begin:
BEGIN;
step s2-create-extension-version-11:
CREATE extension seg VERSION "1.1";
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.1 public
run_command_on_workers
(localhost,57637,t,seg)
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,1.2)
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,public)
(localhost,57638,t,"")
master_remove_node
starting permutation: s2-drop-extension s2-create-extension-version-11 s2-begin s2-alter-extension-update-to-version-12 s1-add-node-1 s2-commit s1-print
?column?
1
step s2-drop-extension:
drop extension seg;
step s2-create-extension-version-11:
CREATE extension seg VERSION "1.1";
step s2-begin:
BEGIN;
step s2-alter-extension-update-to-version-12:
ALTER extension seg update to "1.2";
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-node-1: <... completed>
?column?
1
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
1
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.2 public
run_command_on_workers
(localhost,57637,t,seg)
(localhost,57638,t,seg)
run_command_on_workers
(localhost,57637,t,1.2)
(localhost,57638,t,1.2)
run_command_on_workers
(localhost,57637,t,public)
(localhost,57638,t,public)
master_remove_node
starting permutation: s2-add-node-1 s2-begin s2-drop-extension s1-remove-node-1 s2-commit s1-print
?column?
1
step s2-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-begin:
BEGIN;
step s2-drop-extension:
drop extension seg;
step s1-remove-node-1:
SELECT 1 FROM master_remove_node('localhost', 57637);
<waiting ...>
step s2-commit:
COMMIT;
step s1-remove-node-1: <... completed>
?column?
1
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
master_remove_node
starting permutation: s2-begin s2-create-extension-with-schema1 s1-add-node-1 s2-commit s1-print
?column?
1
step s2-begin:
BEGIN;
step s2-create-extension-with-schema1:
create schema if not exists schema1;
CREATE extension seg with schema schema1;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.3 schema1
run_command_on_workers
(localhost,57637,t,"")
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,"")
(localhost,57638,t,"")
run_command_on_workers
(localhost,57637,t,"")
(localhost,57638,t,"")
master_remove_node
starting permutation: s2-add-node-1 s2-drop-extension-cascade s2-create-extension-with-schema2 s2-begin s2-alter-extension-version-13 s1-remove-node-1 s2-commit s1-print
?column?
1
step s2-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-drop-extension-cascade:
drop extension seg cascade;
step s2-create-extension-with-schema2:
create schema if not exists schema2;
CREATE extension seg with schema schema2;
step s2-begin:
BEGIN;
step s2-alter-extension-version-13:
ALTER extension seg update to "1.3";
ERROR: extension "seg" does not exist
step s1-remove-node-1:
SELECT 1 FROM master_remove_node('localhost', 57637);
?column?
1
step s2-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.3 schema2
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
master_remove_node
starting permutation: s2-drop-extension s2-add-node-1 s2-begin s2-create-extension-version-11 s1-remove-node-1 s2-commit s1-print
?column?
1
step s2-drop-extension:
drop extension seg;
step s2-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-begin:
BEGIN;
step s2-create-extension-version-11:
CREATE extension seg VERSION "1.1";
step s1-remove-node-1:
SELECT 1 FROM master_remove_node('localhost', 57637);
?column?
1
step s2-commit:
COMMIT;
step s1-print:
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
count
0
extname extversion nspname
citus 9.1-1 pg_catalog
plpgsql 1.0 pg_catalog
seg 1.1 public
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
run_command_on_workers
(localhost,57638,t,"")
master_remove_node

View File

@ -0,0 +1,392 @@
CREATE SCHEMA "extension'test";
-- use a schema name with escape character
SET search_path TO "extension'test";
-- create an extension on the given search_path
-- the extension is on contrib, so should be avaliable for the regression tests
CREATE EXTENSION seg;
-- make sure that both the schema and the extension is distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
count
-------
1
(1 row)
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test');
count
-------
1
(1 row)
CREATE TABLE test_table (key int, value seg);
SELECT create_distributed_table('test_table', 'key');
create_distributed_table
--------------------------
(1 row)
-- make sure that the table is also distributed now
SELECT count(*) from pg_dist_partition where logicalrelid='extension''test.test_table'::regclass;
count
-------
1
(1 row)
CREATE TYPE two_segs AS (seg_1 seg, seg_2 seg);
-- verify that the type that depends on the extension is also marked as distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_type WHERE typname = 'two_segs' AND typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test'));
count
-------
1
(1 row)
-- now try to run CREATE EXTENSION within a transction block, all should work fine
BEGIN;
CREATE EXTENSION isn WITH SCHEMA public;
-- now, try create a reference table relying on the data types
-- this should not succeed as we do not distribute extension commands within transaction blocks
CREATE TABLE dist_table (key int, value public.issn);
SELECT create_distributed_table('dist_table', 'key');
create_distributed_table
--------------------------
(1 row)
-- we can even run queries (sequentially) over the distributed table
SELECT * FROM dist_table;
key | value
-----+-------
(0 rows)
INSERT INTO dist_table VALUES (1, public.issn('1436-4522'));
INSERT INTO dist_table SELECT * FROM dist_table RETURNING *;
key | value
-----+-----------
1 | 1436-4522
(1 row)
COMMIT;
-- make sure that the extension is distributed even if we run create extension in a transaction block
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn');
count
-------
1
(1 row)
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$);
run_command_on_workers
------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
CREATE TABLE ref_table (a public.issn);
-- now, create a reference table relying on the data types
SELECT create_reference_table('ref_table');
create_reference_table
------------------------
(1 row)
-- now, drop the extension, recreate it with an older version and update it to latest version
SET client_min_messages TO WARNING;
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH VERSION "1.1";
RESET client_min_messages;
-- before updating the version, ensure the current version
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$);
run_command_on_workers
-------------------------
(localhost,57637,t,1.1)
(localhost,57638,t,1.1)
(2 rows)
-- now, update to a newer version
ALTER EXTENSION isn UPDATE TO '1.2';
-- show that ALTER EXTENSION is propagated
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$);
run_command_on_workers
-------------------------
(localhost,57637,t,1.2)
(localhost,57638,t,1.2)
(2 rows)
-- before changing the schema, ensure the current schmea
SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$);
run_command_on_workers
------------------------------------
(localhost,57637,t,extension'test)
(localhost,57638,t,extension'test)
(2 rows)
-- now change the schema
ALTER EXTENSION isn SET SCHEMA public;
-- switch back to public schema as we set extension's schema to public
SET search_path TO public;
-- make sure that the extension is distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn');
count
-------
1
(1 row)
-- show that the ALTER EXTENSION command is propagated
SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$);
run_command_on_workers
----------------------------
(localhost,57637,t,public)
(localhost,57638,t,public)
(2 rows)
-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement
SET client_min_messages TO WARNING;
-- drop the extension finally
DROP EXTENSION isn CASCADE;
-- restore client_min_messages after DROP EXTENSION
RESET client_min_messages;
-- now make sure that the reference tables depending on an extension can be succesfully created.
-- we should also ensure that we replicate this reference table (and hence the extension)
-- to new nodes after calling master_activate_node.
-- now, first drop seg and existing objects before next test
SET client_min_messages TO WARNING;
DROP EXTENSION seg CASCADE;
-- but as we have only 2 ports in postgresql tests, let's remove one of the nodes first
-- before remove, first remove the existing relations (due to the other tests)
DROP SCHEMA "extension'test" CASCADE;
RESET client_min_messages;
SELECT 1 from master_remove_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- then create the extension
CREATE EXTENSION seg;
-- show that the extension is created on existing worker
SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$);
run_command_on_workers
------------------------
(localhost,57637,t,1)
(1 row)
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$);
run_command_on_workers
-------------------------
(localhost,57637,t,1.3)
(1 row)
-- now create the reference table
CREATE TABLE ref_table_2 (x seg);
SELECT create_reference_table('ref_table_2');
create_reference_table
------------------------
(1 row)
-- and add the other node
SELECT 1 from master_add_node('localhost', 57638);
NOTICE: Replicating reference table "ref_table_2" to the node localhost:57638
?column?
----------
1
(1 row)
-- show that the extension is created on both existing and new node
SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$);
run_command_on_workers
------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$);
run_command_on_workers
-------------------------
(localhost,57637,t,1.3)
(localhost,57638,t,1.3)
(2 rows)
-- and similarly check for the reference table
select count(*) from pg_dist_partition where partmethod='n' and logicalrelid='ref_table_2'::regclass;
count
-------
1
(1 row)
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='ref_table_2'::regclass;
count
-------
1
(1 row)
DROP TABLE ref_table_2;
-- now test create extension in another transaction block but rollback this time
BEGIN;
CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public;
ROLLBACK;
-- at the end of the transaction block, we did not create isn extension in coordinator or worker nodes as we rollback'ed
-- make sure that the extension is not distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn');
count
-------
0
(1 row)
-- and the extension does not exist on workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$);
run_command_on_workers
------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- give a notice for the following commands saying that it is not
-- propagated to the workers. the user should run it manually on the workers
CREATE TABLE t1 (A int);
CREATE VIEW v1 AS select * from t1;
ALTER EXTENSION seg ADD VIEW v1;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
ALTER EXTENSION seg DROP VIEW v1;
NOTICE: Citus does not propagate adding/dropping member objects
HINT: You can add/drop the member objects on the workers as well.
DROP VIEW v1;
DROP TABLE t1;
-- drop multiple extensions at the same time
CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public;
-- let's create another extension locally
set citus.enable_ddl_propagation to 'off';
CREATE EXTENSION pg_buffercache;
set citus.enable_ddl_propagation to 'on';
-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement
SET client_min_messages TO WARNING;
DROP EXTENSION pg_buffercache, isn CASCADE;
SELECT count(*) FROM pg_extension WHERE extname IN ('pg_buffercache', 'isn');
count
-------
0
(1 row)
-- restore client_min_messages after DROP EXTENSION
RESET client_min_messages;
-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement
SET client_min_messages TO WARNING;
-- drop extension should just work
DROP EXTENSION seg CASCADE;
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
count
-------
0
(1 row)
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'seg'$$);
run_command_on_workers
------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- restore client_min_messages after DROP EXTENSION
RESET client_min_messages;
-- make sure that the extension is not avaliable anymore as a distributed object
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn'));
count
-------
0
(1 row)
CREATE SCHEMA "extension'test";
SET search_path TO "extension'test";
-- check restriction for sequential execution
-- enable it and see that create command errors but continues its execution by changing citus.multi_shard_modify_mode TO 'off
BEGIN;
CREATE TABLE some_random_table (a int);
SELECT create_distributed_table('some_random_table', 'a');
create_distributed_table
--------------------------
(1 row)
CREATE EXTENSION seg;
CREATE TABLE some_random_table_2 (a int, b seg);
SELECT create_distributed_table('some_random_table_2', 'a');
create_distributed_table
--------------------------
(1 row)
ROLLBACK;
-- show that the CREATE EXTENSION command propagated even if the transaction
-- block is rollbacked, that's a shortcoming of dependency creation logic
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$);
run_command_on_workers
-------------------------
(localhost,57637,t,1.3)
(localhost,57638,t,1.3)
(2 rows)
-- drop the schema and all the objects
SET client_min_messages TO WARNING;
DROP SCHEMA "extension'test" CASCADE;
-- recreate for the next tests
CREATE SCHEMA "extension'test";
-- use a schema name with escape character
SET search_path TO "extension'test";
RESET client_min_messages;
-- remove the node, we'll add back again
SELECT 1 from master_remove_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- now, create a type that depends on another type, which
-- finally depends on an extension
BEGIN;
SET citus.shard_replication_factor TO 1;
CREATE EXTENSION seg;
CREATE EXTENSION isn;
CREATE TYPE test_type AS (a int, b seg);
CREATE TYPE test_type_2 AS (a int, b test_type);
CREATE TABLE t2 (a int, b test_type_2, c issn);
SELECT create_distributed_table('t2', 'a');
create_distributed_table
--------------------------
(1 row)
CREATE TYPE test_type_3 AS (a int, b test_type, c issn);
CREATE TABLE t3 (a int, b test_type_3);
SELECT create_reference_table('t3');
create_reference_table
------------------------
(1 row)
COMMIT;
-- add the node back
SELECT 1 from master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "t3" to the node localhost:57638
?column?
----------
1
(1 row)
-- make sure that both extensions are created on both nodes
SELECT count(*) FROM citus.pg_dist_object WHERE objid IN (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn'));
count
-------
2
(1 row)
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname IN ('seg', 'isn')$$);
run_command_on_workers
------------------------
(localhost,57637,t,2)
(localhost,57638,t,2)
(2 rows)
-- drop the schema and all the objects
SET client_min_messages TO WARNING;
DROP SCHEMA "extension'test" CASCADE;

View File

@ -7,6 +7,8 @@ test: isolation_master_update_node
test: isolation_ensure_dependency_activate_node
test: isolation_shouldhaveshards
test: isolation_extension_commands
# tests that change node metadata should precede
# isolation_cluster_management such that tests
# that come later can be parallelized
@ -19,7 +21,6 @@ test: isolation_distributed_transaction_id
test: isolation_dump_global_wait_edges
test: isolation_citus_dist_activity
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement
test: isolation_concurrent_dml isolation_data_migration

View File

@ -14,11 +14,16 @@
# ---
# Tests around schema changes, these are run first, so there's no preexisting objects.
#
# propagate_extension_commands lies just after multi_cluster_management as we do
# remove / add node operations, we do not want any preexisting objects before
# propagate_extension_commands
# ---
test: multi_extension
test: multi_703_upgrade
test: multi_cluster_management
test: alter_role_propagation
test: propagate_extension_commands
test: multi_test_helpers
test: multi_table_ddl
test: multi_name_lengths

View File

@ -0,0 +1,154 @@
setup
{
SELECT 1 FROM master_add_node('localhost', 57638);
}
teardown
{
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-add-node-1"
{
SELECT 1 FROM master_add_node('localhost', 57637);
}
step "s1-activate-node-1"
{
SELECT 1 FROM master_activate_node('localhost', 57637);
}
step "s1-disable-node-1"
{
SELECT 1 FROM master_disable_node('localhost', 57637);
}
step "s1-remove-node-1"
{
SELECT 1 FROM master_remove_node('localhost', 57637);
}
step "s1-commit"
{
COMMIT;
}
step "s1-alter-extension-12"
{
alter extension seg update to "1.2";
}
step "s1-drop-extension-cascade"
{
drop extension seg cascade;
}
step "s1-create-extension-with-schema2"
{
create schema if not exists schema2;
CREATE extension seg with schema schema2;
}
step "s1-alter-extension-set-schema2"
{
create schema if not exists schema2;
alter extension seg set schema schema2;
}
step "s1-print"
{
select count(*) from citus.pg_dist_object ;
select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3;
SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$);
SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$);
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-add-node-1"
{
SELECT 1 FROM master_add_node('localhost', 57637);
}
step "s2-create-extension-version-11"
{
CREATE extension seg VERSION "1.1";
}
step "s2-alter-extension-version-13"
{
ALTER extension seg update to "1.3";
}
step "s2-create-extension-with-schema1"
{
create schema if not exists schema1;
CREATE extension seg with schema schema1;
}
step "s2-create-extension-with-schema2"
{
create schema if not exists schema2;
CREATE extension seg with schema schema2;
}
step "s2-drop-extension"
{
drop extension seg;
}
step "s2-drop-extension-cascade"
{
drop extension seg cascade;
}
step "s2-alter-extension-update-to-version-12"
{
ALTER extension seg update to "1.2";
}
step "s2-alter-extension-set-schema4"
{
CREATE schema if not exists schema4;
alter extension seg set schema schema4;
}
step "s2-alter-extension-set-schema-public"
{
alter extension seg set schema public;
}
step "s2-commit"
{
COMMIT;
}
# master_#_node vs extension command
permutation "s1-begin" "s1-add-node-1" "s2-create-extension-version-11" "s1-commit" "s1-print"
permutation "s1-begin" "s1-add-node-1" "s2-alter-extension-update-to-version-12" "s1-commit" "s1-print"
permutation "s1-add-node-1" "s1-begin" "s1-remove-node-1" "s2-drop-extension" "s1-commit" "s1-print"
permutation "s1-begin" "s1-add-node-1" "s2-create-extension-with-schema1" "s1-commit" "s1-print"
permutation "s1-begin" "s1-add-node-1" "s2-drop-extension-cascade" "s1-commit" "s1-print"
permutation "s1-add-node-1" "s1-create-extension-with-schema2" "s1-begin" "s1-remove-node-1" "s2-alter-extension-set-schema4" "s1-commit" "s1-print"
permutation "s1-add-node-1" "s1-begin" "s1-remove-node-1" "s2-create-extension-with-schema1" "s1-commit" "s1-print"
# extension command vs master_#_node
permutation "s2-drop-extension" "s2-begin" "s2-create-extension-version-11" "s1-add-node-1" "s2-commit" "s1-print"
permutation "s2-drop-extension" "s2-create-extension-version-11" "s2-begin" "s2-alter-extension-update-to-version-12" "s1-add-node-1" "s2-commit" "s1-print"
permutation "s2-add-node-1" "s2-begin" "s2-drop-extension" "s1-remove-node-1" "s2-commit" "s1-print"
permutation "s2-begin" "s2-create-extension-with-schema1" "s1-add-node-1" "s2-commit" "s1-print"
permutation "s2-add-node-1" "s2-drop-extension-cascade" "s2-create-extension-with-schema2" "s2-begin" "s2-alter-extension-version-13" "s1-remove-node-1" "s2-commit" "s1-print"
permutation "s2-drop-extension" "s2-add-node-1" "s2-begin" "s2-create-extension-version-11" "s1-remove-node-1" "s2-commit" "s1-print"

View File

@ -0,0 +1,234 @@
CREATE SCHEMA "extension'test";
-- use a schema name with escape character
SET search_path TO "extension'test";
-- create an extension on the given search_path
-- the extension is on contrib, so should be avaliable for the regression tests
CREATE EXTENSION seg;
-- make sure that both the schema and the extension is distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test');
CREATE TABLE test_table (key int, value seg);
SELECT create_distributed_table('test_table', 'key');
-- make sure that the table is also distributed now
SELECT count(*) from pg_dist_partition where logicalrelid='extension''test.test_table'::regclass;
CREATE TYPE two_segs AS (seg_1 seg, seg_2 seg);
-- verify that the type that depends on the extension is also marked as distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_type WHERE typname = 'two_segs' AND typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test'));
-- now try to run CREATE EXTENSION within a transction block, all should work fine
BEGIN;
CREATE EXTENSION isn WITH SCHEMA public;
-- now, try create a reference table relying on the data types
-- this should not succeed as we do not distribute extension commands within transaction blocks
CREATE TABLE dist_table (key int, value public.issn);
SELECT create_distributed_table('dist_table', 'key');
-- we can even run queries (sequentially) over the distributed table
SELECT * FROM dist_table;
INSERT INTO dist_table VALUES (1, public.issn('1436-4522'));
INSERT INTO dist_table SELECT * FROM dist_table RETURNING *;
COMMIT;
-- make sure that the extension is distributed even if we run create extension in a transaction block
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn');
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$);
CREATE TABLE ref_table (a public.issn);
-- now, create a reference table relying on the data types
SELECT create_reference_table('ref_table');
-- now, drop the extension, recreate it with an older version and update it to latest version
SET client_min_messages TO WARNING;
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH VERSION "1.1";
RESET client_min_messages;
-- before updating the version, ensure the current version
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$);
-- now, update to a newer version
ALTER EXTENSION isn UPDATE TO '1.2';
-- show that ALTER EXTENSION is propagated
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$);
-- before changing the schema, ensure the current schmea
SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$);
-- now change the schema
ALTER EXTENSION isn SET SCHEMA public;
-- switch back to public schema as we set extension's schema to public
SET search_path TO public;
-- make sure that the extension is distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn');
-- show that the ALTER EXTENSION command is propagated
SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$);
-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement
SET client_min_messages TO WARNING;
-- drop the extension finally
DROP EXTENSION isn CASCADE;
-- restore client_min_messages after DROP EXTENSION
RESET client_min_messages;
-- now make sure that the reference tables depending on an extension can be succesfully created.
-- we should also ensure that we replicate this reference table (and hence the extension)
-- to new nodes after calling master_activate_node.
-- now, first drop seg and existing objects before next test
SET client_min_messages TO WARNING;
DROP EXTENSION seg CASCADE;
-- but as we have only 2 ports in postgresql tests, let's remove one of the nodes first
-- before remove, first remove the existing relations (due to the other tests)
DROP SCHEMA "extension'test" CASCADE;
RESET client_min_messages;
SELECT 1 from master_remove_node('localhost', :worker_2_port);
-- then create the extension
CREATE EXTENSION seg;
-- show that the extension is created on existing worker
SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$);
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$);
-- now create the reference table
CREATE TABLE ref_table_2 (x seg);
SELECT create_reference_table('ref_table_2');
-- and add the other node
SELECT 1 from master_add_node('localhost', 57638);
-- show that the extension is created on both existing and new node
SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$);
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$);
-- and similarly check for the reference table
select count(*) from pg_dist_partition where partmethod='n' and logicalrelid='ref_table_2'::regclass;
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='ref_table_2'::regclass;
DROP TABLE ref_table_2;
-- now test create extension in another transaction block but rollback this time
BEGIN;
CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public;
ROLLBACK;
-- at the end of the transaction block, we did not create isn extension in coordinator or worker nodes as we rollback'ed
-- make sure that the extension is not distributed
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn');
-- and the extension does not exist on workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$);
-- give a notice for the following commands saying that it is not
-- propagated to the workers. the user should run it manually on the workers
CREATE TABLE t1 (A int);
CREATE VIEW v1 AS select * from t1;
ALTER EXTENSION seg ADD VIEW v1;
ALTER EXTENSION seg DROP VIEW v1;
DROP VIEW v1;
DROP TABLE t1;
-- drop multiple extensions at the same time
CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public;
-- let's create another extension locally
set citus.enable_ddl_propagation to 'off';
CREATE EXTENSION pg_buffercache;
set citus.enable_ddl_propagation to 'on';
-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement
SET client_min_messages TO WARNING;
DROP EXTENSION pg_buffercache, isn CASCADE;
SELECT count(*) FROM pg_extension WHERE extname IN ('pg_buffercache', 'isn');
-- restore client_min_messages after DROP EXTENSION
RESET client_min_messages;
-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement
SET client_min_messages TO WARNING;
-- drop extension should just work
DROP EXTENSION seg CASCADE;
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'seg'$$);
-- restore client_min_messages after DROP EXTENSION
RESET client_min_messages;
-- make sure that the extension is not avaliable anymore as a distributed object
SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn'));
CREATE SCHEMA "extension'test";
SET search_path TO "extension'test";
-- check restriction for sequential execution
-- enable it and see that create command errors but continues its execution by changing citus.multi_shard_modify_mode TO 'off
BEGIN;
CREATE TABLE some_random_table (a int);
SELECT create_distributed_table('some_random_table', 'a');
CREATE EXTENSION seg;
CREATE TABLE some_random_table_2 (a int, b seg);
SELECT create_distributed_table('some_random_table_2', 'a');
ROLLBACK;
-- show that the CREATE EXTENSION command propagated even if the transaction
-- block is rollbacked, that's a shortcoming of dependency creation logic
SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$);
-- drop the schema and all the objects
SET client_min_messages TO WARNING;
DROP SCHEMA "extension'test" CASCADE;
-- recreate for the next tests
CREATE SCHEMA "extension'test";
-- use a schema name with escape character
SET search_path TO "extension'test";
RESET client_min_messages;
-- remove the node, we'll add back again
SELECT 1 from master_remove_node('localhost', :worker_2_port);
-- now, create a type that depends on another type, which
-- finally depends on an extension
BEGIN;
SET citus.shard_replication_factor TO 1;
CREATE EXTENSION seg;
CREATE EXTENSION isn;
CREATE TYPE test_type AS (a int, b seg);
CREATE TYPE test_type_2 AS (a int, b test_type);
CREATE TABLE t2 (a int, b test_type_2, c issn);
SELECT create_distributed_table('t2', 'a');
CREATE TYPE test_type_3 AS (a int, b test_type, c issn);
CREATE TABLE t3 (a int, b test_type_3);
SELECT create_reference_table('t3');
COMMIT;
-- add the node back
SELECT 1 from master_add_node('localhost', :worker_2_port);
-- make sure that both extensions are created on both nodes
SELECT count(*) FROM citus.pg_dist_object WHERE objid IN (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn'));
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname IN ('seg', 'isn')$$);
-- drop the schema and all the objects
SET client_min_messages TO WARNING;
DROP SCHEMA "extension'test" CASCADE;