Propagate foreign server ops

pull/5468/head
Ahmet Gedemenli 2021-11-16 20:44:21 +03:00
parent 61b5fb1cfc
commit 042d45b263
19 changed files with 985 additions and 21 deletions

View File

@ -288,6 +288,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return CreateExtensionDDLCommand(dependency);
}
case OCLASS_FOREIGN_SERVER:
{
return GetForeignServerCreateDDLCommand(dependency->objectId);
}
default:
{
break;

View File

@ -89,6 +89,14 @@ static DistributeObjectOps Any_AlterExtensionContents = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_AlterForeignServer = {
.deparse = DeparseAlterForeignServerStmt,
.qualify = NULL,
.preprocess = PreprocessAlterForeignServerStmt,
.postprocess = NULL,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_AlterFunction = {
.deparse = DeparseAlterFunctionStmt,
.qualify = QualifyAlterFunctionStmt,
@ -177,6 +185,14 @@ static DistributeObjectOps Any_CreatePolicy = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_CreateForeignServer = {
.deparse = DeparseCreateForeignServerStmt,
.qualify = NULL,
.preprocess = PreprocessCreateForeignServerStmt,
.postprocess = PostprocessCreateForeignServerStmt,
.address = CreateForeignServerStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_CreateStatistics = {
.deparse = DeparseCreateStatisticsStmt,
.qualify = QualifyCreateStatisticsStmt,
@ -297,6 +313,30 @@ static DistributeObjectOps Extension_Drop = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps ForeignServer_Drop = {
.deparse = DeparseDropForeignServerStmt,
.qualify = NULL,
.preprocess = PreprocessDropForeignServerStmt,
.postprocess = NULL,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps ForeignServer_Rename = {
.deparse = DeparseAlterForeignServerRenameStmt,
.qualify = NULL,
.preprocess = PreprocessRenameForeignServerStmt,
.postprocess = NULL,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps ForeignServer_AlterOwner = {
.deparse = DeparseAlterForeignServerOwnerStmt,
.qualify = NULL,
.preprocess = PreprocessAlterForeignServerOwnerStmt,
.postprocess = PostprocessAlterForeignServerOwnerStmt,
.address = AlterForeignServerOwnerStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps ForeignTable_AlterTable = {
.deparse = NULL,
.qualify = NULL,
@ -675,6 +715,11 @@ GetDistributeObjectOps(Node *node)
return &Any_AlterFunction;
}
case T_AlterForeignServerStmt:
{
return &Any_AlterForeignServer;
}
case T_AlterObjectDependsStmt:
{
AlterObjectDependsStmt *stmt = castNode(AlterObjectDependsStmt, node);
@ -789,6 +834,11 @@ GetDistributeObjectOps(Node *node)
return &Database_AlterOwner;
}
case OBJECT_FOREIGN_SERVER:
{
return &ForeignServer_AlterOwner;
}
case OBJECT_FUNCTION:
{
return &Function_AlterOwner;
@ -915,6 +965,11 @@ GetDistributeObjectOps(Node *node)
return &Any_CreateFunction;
}
case T_CreateForeignServerStmt:
{
return &Any_CreateForeignServer;
}
case T_CreatePolicyStmt:
{
return &Any_CreatePolicy;
@ -977,6 +1032,11 @@ GetDistributeObjectOps(Node *node)
return &Function_Drop;
}
case OBJECT_FOREIGN_SERVER:
{
return &ForeignServer_Drop;
}
case OBJECT_INDEX:
{
return &Index_Drop;
@ -1081,6 +1141,11 @@ GetDistributeObjectOps(Node *node)
return &Collation_Rename;
}
case OBJECT_FOREIGN_SERVER:
{
return &ForeignServer_Rename;
}
case OBJECT_FUNCTION:
{
return &Function_Rename;

View File

@ -797,7 +797,7 @@ CreateExtensionDDLCommand(const ObjectAddress *extensionAddress)
/*
* RecreateEnumStmt returns a parsetree for a CREATE EXTENSION statement that would
* RecreateExtensionStmt returns a parsetree for a CREATE EXTENSION statement that would
* recreate the given extension on a new node.
*/
static Node *

View File

@ -0,0 +1,364 @@
/*-------------------------------------------------------------------------
*
* foreign_server.c
* Commands for FOREIGN SERVER statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_foreign_server.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
static Node * RecreateForeignServerStmt(Oid serverId);
static bool NameListHasDistributedServer(List *serverNames);
static ObjectAddress GetObjectAddressByServerName(char *serverName, bool missing_ok);
/*
* PreprocessCreateForeignServerStmt is called during the planning phase for
* CREATE SERVER.
*/
List *
PreprocessCreateForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterForeignServerStmt is called during the planning phase for
* ALTER SERVER .. OPTIONS ..
*/
List *
PreprocessAlterForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterForeignServerStmt *stmt = castNode(AlterForeignServerStmt, node);
ObjectAddress address = GetObjectAddressByServerName(stmt->servername, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessRenameForeignServerStmt is called during the planning phase for
* ALTER SERVER RENAME.
*/
List *
PreprocessRenameForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_FOREIGN_SERVER);
ObjectAddress address = GetObjectAddressByServerName(strVal(stmt->object), false);
/* filter distributed servers */
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterForeignServerOwnerStmt is called during the planning phase for
* ALTER SERVER .. OWNER TO.
*/
List *
PreprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_FOREIGN_SERVER);
ObjectAddress address = GetObjectAddressByServerName(strVal(stmt->object), false);
/* filter distributed servers */
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessDropForeignServerStmt is called during the planning phase for
* DROP SERVER.
*/
List *
PreprocessDropForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
Assert(stmt->removeType == OBJECT_FOREIGN_SERVER);
bool includesDistributedServer = NameListHasDistributedServer(stmt->objects);
if (!includesDistributedServer)
{
return NIL;
}
if (list_length(stmt->objects) > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot drop distributed server with other servers"),
errhint("Try dropping each object in a separate DROP command")));
}
if (!ShouldPropagate())
{
return NIL;
}
EnsureCoordinator();
Assert(list_length(stmt->objects) == 1);
Value *serverValue = linitial(stmt->objects);
ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false);
/* unmark distributed server */
UnmarkObjectDistributed(&address);
const char *deparsedStmt = DeparseTreeNode((Node *) stmt);
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) deparsedStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessCreateForeignServerStmt is called after a CREATE SERVER command has
* been executed by standard process utility.
*/
List *
PostprocessCreateForeignServerStmt(Node *node, const char *queryString)
{
bool missingOk = false;
ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk);
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PostprocessAlterForeignServerOwnerStmt is called after a ALTER SERVER OWNER command
* has been executed by standard process utility.
*/
List *
PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString)
{
bool missingOk = false;
ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk);
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* CreateForeignServerStmtObjectAddress finds the ObjectAddress for the server
* that is created by given CreateForeignServerStmt. If missingOk is false and if
* the server does not exist, then it errors out.
*
* Never returns NULL, but the objid in the address can be invalid if missingOk
* was set to true.
*/
ObjectAddress
CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok)
{
CreateForeignServerStmt *stmt = castNode(CreateForeignServerStmt, node);
return GetObjectAddressByServerName(stmt->servername, missing_ok);
}
/*
* AlterForeignServerOwnerStmtObjectAddress finds the ObjectAddress for the server
* given in AlterOwnerStmt. If missingOk is false and if
* the server does not exist, then it errors out.
*
* Never returns NULL, but the objid in the address can be invalid if missingOk
* was set to true.
*/
ObjectAddress
AlterForeignServerOwnerStmtObjectAddress(Node *node, bool missing_ok)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
char *serverName = strVal(stmt->object);
return GetObjectAddressByServerName(serverName, missing_ok);
}
/*
* GetForeignServerCreateDDLCommand returns a list that includes the CREATE SERVER
* command that would recreate the given server on a new node.
*/
List *
GetForeignServerCreateDDLCommand(Oid serverId)
{
/* generate a statement for creation of the server in "if not exists" construct */
Node *stmt = RecreateForeignServerStmt(serverId);
/* capture ddl command for the create statement */
const char *ddlCommand = DeparseTreeNode(stmt);
List *ddlCommands = list_make1((void *) ddlCommand);
return ddlCommands;
}
/*
* RecreateForeignServerStmt returns a parsetree for a CREATE SERVER statement
* that would recreate the given server on a new node.
*/
static Node *
RecreateForeignServerStmt(Oid serverId)
{
ForeignServer *server = GetForeignServer(serverId);
CreateForeignServerStmt *createStmt = makeNode(CreateForeignServerStmt);
/* set server name and if_not_exists fields */
createStmt->servername = pstrdup(server->servername);
createStmt->if_not_exists = true;
/* set foreign data wrapper */
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
createStmt->fdwname = pstrdup(fdw->fdwname);
/* set all fields using the existing server */
if (server->servertype != NULL)
{
createStmt->servertype = pstrdup(server->servertype);
}
if (server->serverversion != NULL)
{
createStmt->version = pstrdup(server->serverversion);
}
createStmt->options = NIL;
int location = -1;
DefElem *option = NULL;
foreach_ptr(option, server->options)
{
DefElem *copyOption = makeDefElem(option->defname, option->arg, location);
createStmt->options = lappend(createStmt->options, copyOption);
}
return (Node *) createStmt;
}
/*
* NameListHasDistributedServer takes a namelist of servers and returns true if at least
* one of them is distributed. Returns false otherwise.
*/
static bool
NameListHasDistributedServer(List *serverNames)
{
Value *serverValue = NULL;
foreach_ptr(serverValue, serverNames)
{
ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false);
if (IsObjectDistributed(&address))
{
return true;
}
}
return false;
}
static ObjectAddress
GetObjectAddressByServerName(char *serverName, bool missing_ok)
{
ForeignServer *server = GetForeignServerByName(serverName, missing_ok);
Oid serverOid = server->serverid;
ObjectAddress address = { 0 };
ObjectAddressSet(address, ForeignServerRelationId, serverOid);
return address;
}

View File

@ -77,7 +77,6 @@
static void deparse_index_columns(StringInfo buffer, List *indexParameterList,
List *deparseContext);
static void AppendOptionListToString(StringInfo stringData, List *options);
static void AppendStorageParametersToString(StringInfo stringBuffer,
List *optionList);
static void simple_quote_literal(StringInfo buf, const char *val);
@ -1056,7 +1055,7 @@ generate_qualified_relation_name(Oid relid)
* AppendOptionListToString converts the option list to its textual format, and
* appends this text to the given string buffer.
*/
static void
void
AppendOptionListToString(StringInfo stringBuffer, List *optionList)
{
if (optionList != NIL)

View File

@ -0,0 +1,277 @@
/*-------------------------------------------------------------------------
*
* deparse_foreign_server_stmts.c
* All routines to deparse foreign server statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "commands/defrem.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "utils/builtins.h"
static void AppendCreateForeignServerStmt(StringInfo buf, CreateForeignServerStmt *stmt);
static void AppendAlterForeignServerStmt(StringInfo buf, AlterForeignServerStmt *stmt);
static void AppendAlterForeignServerOptions(StringInfo buf, AlterForeignServerStmt *stmt);
static void AppendAlterForeignServerRenameStmt(StringInfo buf, RenameStmt *stmt);
static void AppendAlterForeignServerOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendDropForeignServerStmt(StringInfo buf, DropStmt *stmt);
static void AppendServerNames(StringInfo buf, DropStmt *stmt);
static void AppendBehavior(StringInfo buf, DropStmt *stmt);
static char * GetDefElemActionString(DefElemAction action);
char *
DeparseCreateForeignServerStmt(Node *node)
{
CreateForeignServerStmt *stmt = castNode(CreateForeignServerStmt, node);
StringInfoData str;
initStringInfo(&str);
AppendCreateForeignServerStmt(&str, stmt);
return str.data;
}
char *
DeparseAlterForeignServerStmt(Node *node)
{
AlterForeignServerStmt *stmt = castNode(AlterForeignServerStmt, node);
StringInfoData str;
initStringInfo(&str);
AppendAlterForeignServerStmt(&str, stmt);
return str.data;
}
char *
DeparseAlterForeignServerRenameStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_FOREIGN_SERVER);
StringInfoData str;
initStringInfo(&str);
AppendAlterForeignServerRenameStmt(&str, stmt);
return str.data;
}
char *
DeparseAlterForeignServerOwnerStmt(Node *node)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_FOREIGN_SERVER);
StringInfoData str;
initStringInfo(&str);
AppendAlterForeignServerOwnerStmt(&str, stmt);
return str.data;
}
char *
DeparseDropForeignServerStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
Assert(stmt->removeType == OBJECT_FOREIGN_SERVER);
StringInfoData str;
initStringInfo(&str);
AppendDropForeignServerStmt(&str, stmt);
return str.data;
}
static void
AppendCreateForeignServerStmt(StringInfo buf, CreateForeignServerStmt *stmt)
{
appendStringInfoString(buf, "CREATE SERVER ");
if (stmt->if_not_exists)
{
appendStringInfoString(buf, "IF NOT EXISTS ");
}
appendStringInfo(buf, "%s ", quote_identifier(stmt->servername));
if (stmt->servertype)
{
appendStringInfo(buf, "TYPE %s ", quote_literal_cstr(stmt->servertype));
}
if (stmt->version)
{
appendStringInfo(buf, "VERSION %s ", quote_literal_cstr(stmt->version));
}
appendStringInfo(buf, "FOREIGN DATA WRAPPER %s ", quote_identifier(stmt->fdwname));
AppendOptionListToString(buf, stmt->options);
}
static void
AppendAlterForeignServerStmt(StringInfo buf, AlterForeignServerStmt *stmt)
{
appendStringInfo(buf, "ALTER SERVER %s ", quote_identifier(stmt->servername));
if (stmt->has_version)
{
appendStringInfo(buf, "VERSION %s ", quote_literal_cstr(stmt->version));
}
AppendAlterForeignServerOptions(buf, stmt);
}
static void
AppendAlterForeignServerOptions(StringInfo buf, AlterForeignServerStmt *stmt)
{
if (list_length(stmt->options) <= 0)
{
return;
}
appendStringInfoString(buf, "OPTIONS (");
DefElemAction action = DEFELEM_UNSPEC;
DefElem *def = NULL;
foreach_ptr(def, stmt->options)
{
if (def->defaction != DEFELEM_UNSPEC)
{
action = def->defaction;
char *actionString = GetDefElemActionString(action);
appendStringInfo(buf, "%s ", actionString);
}
appendStringInfo(buf, "%s", quote_identifier(def->defname));
if (action != DEFELEM_DROP)
{
const char *value = quote_literal_cstr(defGetString(def));
appendStringInfo(buf, " %s", value);
}
if (def != llast(stmt->options))
{
appendStringInfoString(buf, ", ");
}
}
appendStringInfoString(buf, ")");
}
static void
AppendAlterForeignServerRenameStmt(StringInfo buf, RenameStmt *stmt)
{
appendStringInfo(buf, "ALTER SERVER %s RENAME TO %s",
quote_identifier(strVal(stmt->object)),
quote_identifier(stmt->newname));
}
static void
AppendAlterForeignServerOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
{
const char *servername = quote_identifier(strVal(stmt->object));
appendStringInfo(buf, "ALTER SERVER %s OWNER TO ", servername);
appendStringInfo(buf, "%s", RoleSpecString(stmt->newowner, true));
}
static void
AppendDropForeignServerStmt(StringInfo buf, DropStmt *stmt)
{
appendStringInfoString(buf, "DROP SERVER ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
AppendServerNames(buf, stmt);
AppendBehavior(buf, stmt);
}
static void
AppendServerNames(StringInfo buf, DropStmt *stmt)
{
Value *serverValue = NULL;
foreach_ptr(serverValue, stmt->objects)
{
const char *serverString = quote_identifier(strVal(serverValue));
appendStringInfo(buf, "%s", serverString);
if (serverValue != llast(stmt->objects))
{
appendStringInfoString(buf, ", ");
}
}
}
static void
AppendBehavior(StringInfo buf, DropStmt *stmt)
{
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
else if (stmt->behavior == DROP_RESTRICT)
{
appendStringInfoString(buf, " RESTRICT");
}
}
static char *
GetDefElemActionString(DefElemAction action)
{
switch (action)
{
case DEFELEM_ADD:
{
return "ADD";
}
case DEFELEM_SET:
{
return "SET";
}
case DEFELEM_DROP:
{
return "DROP";
}
default:
return "";
}
}

View File

@ -612,6 +612,11 @@ SupportedDependencyByCitus(const ObjectAddress *address)
return true;
}
case OCLASS_FOREIGN_SERVER:
{
return true;
}
case OCLASS_ROLE:
{
/*

View File

@ -456,6 +456,14 @@ ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, ObjectAddress *addr,
break;
}
case OBJECT_FOREIGN_SERVER:
{
idToCheck = addr->objectId;
aclMaskResult = pg_foreign_server_aclmask(idToCheck, userId, ACL_USAGE,
ACLMASK_ANY);
break;
}
case OBJECT_SEQUENCE:
{
idToCheck = addr->objectId;

View File

@ -65,6 +65,7 @@ extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid);
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
extern List * getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype);
extern void AppendOptionListToString(StringInfo stringData, List *options);
#endif /* CITUS_RULEUTILS_H */

View File

@ -225,6 +225,27 @@ extern Oid GetReferencedTableId(Oid foreignKeyId);
extern Oid GetReferencingTableId(Oid foreignKeyId);
extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);
/* foreign_server.c - forward declarations */
extern List * PreprocessCreateForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessRenameForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessCreateForeignServerStmt(Node *node, const char *queryString);
extern List * PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString);
extern ObjectAddress CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterForeignServerOwnerStmtObjectAddress(Node *node, bool
missing_ok);
extern List * GetForeignServerCreateDDLCommand(Oid serverId);
/* function.c - forward declarations */
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,

View File

@ -46,6 +46,13 @@ extern void QualifyRenameCollationStmt(Node *stmt);
extern void QualifyAlterCollationSchemaStmt(Node *stmt);
extern void QualifyAlterCollationOwnerStmt(Node *stmt);
/* forward declarations for deparse_foreign_server_stmts.c */
extern char * DeparseCreateForeignServerStmt(Node *node);
extern char * DeparseAlterForeignServerStmt(Node *node);
extern char * DeparseAlterForeignServerRenameStmt(Node *node);
extern char * DeparseAlterForeignServerOwnerStmt(Node *node);
extern char * DeparseDropForeignServerStmt(Node *node);
/* forward declarations for deparse_table_stmts.c */
extern char * DeparseAlterTableSchemaStmt(Node *stmt);
extern char * DeparseAlterTableStmt(Node *node);

View File

@ -612,7 +612,7 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist
SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%';
metadata_command
---------------------------------------------------------------------
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
(1 row)
-- valid distribution with distribution_arg_index

View File

@ -62,6 +62,15 @@ RETURNS fdw_handler
AS 'citus'
LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER fake_fdw_1 HANDLER fake_fdw_handler;
SELECT run_command_on_workers($$
CREATE FOREIGN DATA WRAPPER fake_fdw_1 HANDLER fake_fdw_handler;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE FOREIGN DATA WRAPPER")
(localhost,57638,t,"CREATE FOREIGN DATA WRAPPER")
(2 rows)
CREATE SERVER fake_fdw_server_1 FOREIGN DATA WRAPPER fake_fdw_1;
CREATE FOREIGN TABLE foreign_table (
key int,

View File

@ -0,0 +1,131 @@
CREATE SCHEMA propagate_foreign_server;
SET search_path TO propagate_foreign_server;
-- remove node to add later
SELECT citus_remove_node('localhost', :worker_1_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- create schema, extension and foreign server while the worker is removed
SET citus.enable_ddl_propagation TO OFF;
CREATE SCHEMA test_dependent_schema;
CREATE EXTENSION postgres_fdw WITH SCHEMA test_dependent_schema;
SET citus.enable_ddl_propagation TO ON;
CREATE SERVER foreign_server_dependent_schema
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- verify the dependent schema and the foreign server are created on the newly added worker
SELECT run_command_on_workers(
$$SELECT COUNT(*) FROM pg_namespace WHERE nspname = 'test_dependent_schema';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
SELECT run_command_on_workers(
$$SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server_dependent_schema';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,t)
(localhost,57638,t,t)
(2 rows)
CREATE SERVER foreign_server_to_drop
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
--should error
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
ERROR: cannot drop distributed server with other servers
HINT: Try dropping each object in a separate DROP command
SET client_min_messages TO ERROR;
DROP SCHEMA test_dependent_schema CASCADE;
RESET client_min_messages;
-- test propagating foreign server creation
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server TYPE 'test_type' VERSION 'v1'
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'testhost', port '5432', dbname 'testdb');
SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server';
?column?
---------------------------------------------------------------------
t
(1 row)
-- verify that the server is created on the worker
SELECT run_command_on_workers(
$$SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,t)
(localhost,57638,t,t)
(2 rows)
ALTER SERVER foreign_server OPTIONS (ADD "fdw_startup_cost" '1000');
ALTER SERVER foreign_server OPTIONS (ADD passfile 'to_be_dropped');
ALTER SERVER foreign_server OPTIONS (SET host 'localhost');
ALTER SERVER foreign_server OPTIONS (DROP port, DROP dbname);
ALTER SERVER foreign_server OPTIONS (ADD port :'master_port', dbname 'regression', DROP passfile);
ALTER SERVER foreign_server RENAME TO "foreign'server_1!";
-- test alter owner
SELECT rolname FROM pg_roles JOIN pg_foreign_server ON (pg_roles.oid=pg_foreign_server.srvowner) WHERE srvname = 'foreign''server_1!';
rolname
---------------------------------------------------------------------
postgres
(1 row)
ALTER SERVER "foreign'server_1!" OWNER TO pg_monitor;
-- verify that the server is renamed on the worker
SELECT run_command_on_workers(
$$SELECT srvoptions FROM pg_foreign_server WHERE srvname = 'foreign''server_1!';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"{host=localhost,fdw_startup_cost=1000,port=57636,dbname=regression}")
(localhost,57638,t,"{host=localhost,fdw_startup_cost=1000,port=57636,dbname=regression}")
(2 rows)
-- verify the owner is changed
SELECT run_command_on_workers(
$$SELECT rolname FROM pg_roles WHERE oid IN (SELECT srvowner FROM pg_foreign_server WHERE srvname = 'foreign''server_1!');$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,pg_monitor)
(localhost,57638,t,pg_monitor)
(2 rows)
-- verify the owner is changed on the coordinator
SELECT rolname FROM pg_roles JOIN pg_foreign_server ON (pg_roles.oid=pg_foreign_server.srvowner) WHERE srvname = 'foreign''server_1!';
rolname
---------------------------------------------------------------------
pg_monitor
(1 row)
DROP SERVER IF EXISTS "foreign'server_1!" CASCADE;
-- verify that the server is dropped on the worker
SELECT run_command_on_workers(
$$SELECT COUNT(*)=0 FROM pg_foreign_server WHERE srvname = 'foreign''server_1!';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,t)
(localhost,57638,t,t)
(2 rows)
\c - - - :worker_1_port
-- not allowed on the worker
ALTER SERVER foreign_server OPTIONS (ADD async_capable 'False');
ERROR: server "foreign_server" does not exist
CREATE SERVER foreign_server_1 TYPE 'test_type' VERSION 'v1'
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'testhost', port '5432', dbname 'testdb');
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
\c - - - :master_port
DROP SCHEMA propagate_foreign_server CASCADE;
NOTICE: drop cascades to extension postgres_fdw

View File

@ -26,6 +26,7 @@ test: multi_cluster_management
# below tests are placed right after multi_cluster_management as we do
# remove/add node operations and we do not want any preexisting objects
test: non_super_user_object_metadata
test: propagate_foreign_servers
test: alter_role_propagation
test: propagate_extension_commands
test: escape_extension_name

View File

@ -1,5 +1,6 @@
test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw
test: multi_test_helpers multi_test_helpers_superuser
test: multi_cluster_management
test: multi_create_fdw
test: multi_test_catalog_views
test: replicated_table_disable_node

View File

@ -924,14 +924,6 @@ if (!$conninfo)
'-c', "CREATE FOREIGN DATA WRAPPER $fdw HANDLER $fdws{$fdw};")) == 0
or die "Could not create foreign data wrapper $fdw on worker";
}
foreach my $fdwServer (keys %fdwServers)
{
system(catfile($bindir, "psql"),
('-X', '-h', $host, '-p', $port, '-U', $user, "-d", "regression",
'-c', "CREATE SERVER $fdwServer FOREIGN DATA WRAPPER $fdwServers{$fdwServer};")) == 0
or die "Could not create server $fdwServer on worker";
}
}
}
else
@ -958,14 +950,6 @@ else
'-c', "SELECT run_command_on_workers('CREATE FOREIGN DATA WRAPPER $fdw HANDLER $fdws{$fdw};');")) == 0
or die "Could not create foreign data wrapper $fdw on worker";
}
foreach my $fdwServer (keys %fdwServers)
{
system(catfile($bindir, "psql"),
('-X', '-h', $host, '-p', $masterPort, '-U', $user, "-d", $dbname,
'-c', "SELECT run_command_on_workers('CREATE SERVER $fdwServer FOREIGN DATA WRAPPER $fdwServers{$fdwServer};');")) == 0
or die "Could not create server $fdwServer on worker";
}
}
# Prepare pg_regress arguments

View File

@ -40,6 +40,9 @@ RETURNS fdw_handler
AS 'citus'
LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER fake_fdw_1 HANDLER fake_fdw_handler;
SELECT run_command_on_workers($$
CREATE FOREIGN DATA WRAPPER fake_fdw_1 HANDLER fake_fdw_handler;
$$);
CREATE SERVER fake_fdw_server_1 FOREIGN DATA WRAPPER fake_fdw_1;
CREATE FOREIGN TABLE foreign_table (

View File

@ -0,0 +1,83 @@
CREATE SCHEMA propagate_foreign_server;
SET search_path TO propagate_foreign_server;
-- remove node to add later
SELECT citus_remove_node('localhost', :worker_1_port);
-- create schema, extension and foreign server while the worker is removed
SET citus.enable_ddl_propagation TO OFF;
CREATE SCHEMA test_dependent_schema;
CREATE EXTENSION postgres_fdw WITH SCHEMA test_dependent_schema;
SET citus.enable_ddl_propagation TO ON;
CREATE SERVER foreign_server_dependent_schema
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
-- verify the dependent schema and the foreign server are created on the newly added worker
SELECT run_command_on_workers(
$$SELECT COUNT(*) FROM pg_namespace WHERE nspname = 'test_dependent_schema';$$);
SELECT run_command_on_workers(
$$SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server_dependent_schema';$$);
CREATE SERVER foreign_server_to_drop
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
--should error
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
SET client_min_messages TO ERROR;
DROP SCHEMA test_dependent_schema CASCADE;
RESET client_min_messages;
-- test propagating foreign server creation
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server TYPE 'test_type' VERSION 'v1'
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'testhost', port '5432', dbname 'testdb');
SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server';
-- verify that the server is created on the worker
SELECT run_command_on_workers(
$$SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server';$$);
ALTER SERVER foreign_server OPTIONS (ADD "fdw_startup_cost" '1000');
ALTER SERVER foreign_server OPTIONS (ADD passfile 'to_be_dropped');
ALTER SERVER foreign_server OPTIONS (SET host 'localhost');
ALTER SERVER foreign_server OPTIONS (DROP port, DROP dbname);
ALTER SERVER foreign_server OPTIONS (ADD port :'master_port', dbname 'regression', DROP passfile);
ALTER SERVER foreign_server RENAME TO "foreign'server_1!";
-- test alter owner
SELECT rolname FROM pg_roles JOIN pg_foreign_server ON (pg_roles.oid=pg_foreign_server.srvowner) WHERE srvname = 'foreign''server_1!';
ALTER SERVER "foreign'server_1!" OWNER TO pg_monitor;
-- verify that the server is renamed on the worker
SELECT run_command_on_workers(
$$SELECT srvoptions FROM pg_foreign_server WHERE srvname = 'foreign''server_1!';$$);
-- verify the owner is changed
SELECT run_command_on_workers(
$$SELECT rolname FROM pg_roles WHERE oid IN (SELECT srvowner FROM pg_foreign_server WHERE srvname = 'foreign''server_1!');$$);
-- verify the owner is changed on the coordinator
SELECT rolname FROM pg_roles JOIN pg_foreign_server ON (pg_roles.oid=pg_foreign_server.srvowner) WHERE srvname = 'foreign''server_1!';
DROP SERVER IF EXISTS "foreign'server_1!" CASCADE;
-- verify that the server is dropped on the worker
SELECT run_command_on_workers(
$$SELECT COUNT(*)=0 FROM pg_foreign_server WHERE srvname = 'foreign''server_1!';$$);
\c - - - :worker_1_port
-- not allowed on the worker
ALTER SERVER foreign_server OPTIONS (ADD async_capable 'False');
CREATE SERVER foreign_server_1 TYPE 'test_type' VERSION 'v1'
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'testhost', port '5432', dbname 'testdb');
\c - - - :master_port
DROP SCHEMA propagate_foreign_server CASCADE;