mirror of https://github.com/citusdata/citus.git
Support foreign tables in MX (#5461)
parent
5305aa4246
commit
45e423136c
|
@ -368,7 +368,7 @@ UndistributeTable(TableConversionParameters *params)
|
|||
EnsureTableNotReferencing(params->relationId, UNDISTRIBUTE_TABLE);
|
||||
EnsureTableNotReferenced(params->relationId, UNDISTRIBUTE_TABLE);
|
||||
}
|
||||
EnsureTableNotForeign(params->relationId);
|
||||
|
||||
EnsureTableNotPartition(params->relationId);
|
||||
|
||||
if (PartitionedTable(params->relationId))
|
||||
|
@ -994,8 +994,7 @@ EnsureTableNotReferenced(Oid relationId, char conversionType)
|
|||
void
|
||||
EnsureTableNotForeign(Oid relationId)
|
||||
{
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
if (IsForeignTable(relationId))
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot complete operation "
|
||||
"because it is a foreign table")));
|
||||
|
@ -1063,7 +1062,7 @@ CreateTableConversion(TableConversionParameters *params)
|
|||
BuildDistributionKeyFromColumnName(relation, con->distributionColumn);
|
||||
|
||||
con->originalAccessMethod = NULL;
|
||||
if (!PartitionedTable(con->relationId))
|
||||
if (!PartitionedTable(con->relationId) && !IsForeignTable(con->relationId))
|
||||
{
|
||||
HeapTuple amTuple = SearchSysCache1(AMOID, ObjectIdGetDatum(
|
||||
relation->rd_rel->relam));
|
||||
|
@ -1305,7 +1304,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
|
|||
|
||||
StringInfo query = makeStringInfo();
|
||||
|
||||
if (!PartitionedTable(sourceId))
|
||||
if (!PartitionedTable(sourceId) && !IsForeignTable(sourceId))
|
||||
{
|
||||
if (!suppressNoticeMessages)
|
||||
{
|
||||
|
@ -1402,7 +1401,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
|
|||
}
|
||||
|
||||
resetStringInfo(query);
|
||||
appendStringInfo(query, "DROP TABLE %s CASCADE",
|
||||
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
|
||||
IsForeignTable(sourceId) ? "FOREIGN " : "",
|
||||
quote_qualified_identifier(schemaName, sourceName));
|
||||
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
|
||||
|
||||
|
|
|
@ -797,6 +797,7 @@ GetDistributeObjectOps(Node *node)
|
|||
return &Statistics_AlterObjectSchema;
|
||||
}
|
||||
|
||||
case OBJECT_FOREIGN_TABLE:
|
||||
case OBJECT_TABLE:
|
||||
{
|
||||
return &Table_AlterObjectSchema;
|
||||
|
|
|
@ -648,7 +648,7 @@ List *
|
|||
PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
|
||||
|
||||
/*
|
||||
* We will let Postgres deal with missing_ok
|
||||
|
@ -1054,7 +1054,8 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
*/
|
||||
Assert(IsCitusTable(rightRelationId));
|
||||
}
|
||||
else if (attachedRelationKind == RELKIND_RELATION)
|
||||
else if (attachedRelationKind == RELKIND_RELATION ||
|
||||
attachedRelationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
Assert(list_length(commandList) <= 1);
|
||||
|
||||
|
@ -1761,7 +1762,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
|
|||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
|
||||
|
||||
if (stmt->relation == NULL)
|
||||
{
|
||||
|
@ -2951,6 +2952,16 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
break;
|
||||
}
|
||||
|
||||
case AT_GenericOptions:
|
||||
{
|
||||
if (IsForeignTable(relationId))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR,
|
||||
|
@ -3326,7 +3337,7 @@ ObjectAddress
|
|||
AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
|
||||
|
||||
const char *tableName = stmt->relation->relname;
|
||||
Oid tableOid = InvalidOid;
|
||||
|
|
|
@ -267,15 +267,13 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
|
|||
|
||||
ErrorIfIllegallyChangingKnownShard(relationId);
|
||||
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
if (IsCitusTable(relationId) &&
|
||||
relationKind == RELKIND_FOREIGN_TABLE)
|
||||
if (IsCitusTable(relationId) && IsForeignTable(relationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("truncating distributed foreign tables is "
|
||||
"currently unsupported"),
|
||||
errhint("Use citus_drop_all_shards to remove "
|
||||
"foreign table's shards.")));
|
||||
errhint("Consider undistributing table before TRUNCATE, "
|
||||
"and then distribute or add to metadata again")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@
|
|||
#include "distributed/transmit.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
@ -98,6 +99,8 @@ static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
|
|||
static bool IsDropSchemaOrDB(Node *parsetree);
|
||||
static bool ShouldCheckUndistributeCitusLocalTables(void);
|
||||
static bool ShouldAddNewTableToMetadata(Node *parsetree);
|
||||
static bool ServerUsesPostgresFDW(char *serverName);
|
||||
static void ErrorIfOptionListHasNoTableName(List *optionList);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -662,6 +665,29 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
PostprocessCreateTableStmt(createStatement, queryString);
|
||||
}
|
||||
|
||||
if (IsA(parsetree, CreateForeignTableStmt))
|
||||
{
|
||||
CreateForeignTableStmt *createForeignTableStmt =
|
||||
(CreateForeignTableStmt *) parsetree;
|
||||
|
||||
CreateStmt *createTableStmt = (CreateStmt *) (&createForeignTableStmt->base);
|
||||
|
||||
/*
|
||||
* Error out with a hint if the foreign table is using postgres_fdw and
|
||||
* the option table_name is not provided.
|
||||
* Citus relays all the Citus local foreign table logic to the placement of the
|
||||
* Citus local table. If table_name is NOT provided, Citus would try to talk to
|
||||
* the foreign postgres table over the shard's table name, which would not exist
|
||||
* on the remote server.
|
||||
*/
|
||||
if (ServerUsesPostgresFDW(createForeignTableStmt->servername))
|
||||
{
|
||||
ErrorIfOptionListHasNoTableName(createForeignTableStmt->options);
|
||||
}
|
||||
|
||||
PostprocessCreateTableStmt(createTableStmt, queryString);
|
||||
}
|
||||
|
||||
/* after local command has completed, finish by executing worker DDLJobs, if any */
|
||||
if (ddlJobs != NIL)
|
||||
{
|
||||
|
@ -891,14 +917,24 @@ ShouldCheckUndistributeCitusLocalTables(void)
|
|||
static bool
|
||||
ShouldAddNewTableToMetadata(Node *parsetree)
|
||||
{
|
||||
if (!IsA(parsetree, CreateStmt))
|
||||
CreateStmt *createTableStmt;
|
||||
|
||||
if (IsA(parsetree, CreateStmt))
|
||||
{
|
||||
/* if the command is not CREATE TABLE, we can early return false */
|
||||
createTableStmt = (CreateStmt *) parsetree;
|
||||
}
|
||||
else if (IsA(parsetree, CreateForeignTableStmt))
|
||||
{
|
||||
CreateForeignTableStmt *createForeignTableStmt =
|
||||
(CreateForeignTableStmt *) parsetree;
|
||||
createTableStmt = (CreateStmt *) &(createForeignTableStmt->base);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* if the command is not CREATE [FOREIGN] TABLE, we can early return false */
|
||||
return false;
|
||||
}
|
||||
|
||||
CreateStmt *createTableStmt = (CreateStmt *) parsetree;
|
||||
|
||||
if (createTableStmt->relation->relpersistence == RELPERSISTENCE_TEMP ||
|
||||
createTableStmt->partbound != NULL)
|
||||
{
|
||||
|
@ -924,6 +960,50 @@ ShouldAddNewTableToMetadata(Node *parsetree)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ServerUsesPostgresFDW gets a foreign server name and returns true if the FDW that
|
||||
* the server depends on is postgres_fdw. Returns false otherwise.
|
||||
*/
|
||||
static bool
|
||||
ServerUsesPostgresFDW(char *serverName)
|
||||
{
|
||||
ForeignServer *server = GetForeignServerByName(serverName, false);
|
||||
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
|
||||
|
||||
if (strcmp(fdw->fdwname, "postgres_fdw") == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfOptionListHasNoTableName gets an option list (DefElem) and errors out
|
||||
* if the list does not contain a table_name element.
|
||||
*/
|
||||
static void
|
||||
ErrorIfOptionListHasNoTableName(List *optionList)
|
||||
{
|
||||
char *table_nameString = "table_name";
|
||||
DefElem *option = NULL;
|
||||
foreach_ptr(option, optionList)
|
||||
{
|
||||
char *optionName = option->defname;
|
||||
if (strcmp(optionName, table_nameString) == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ereport(ERROR, (errmsg(
|
||||
"table_name option must be provided when using postgres_fdw with Citus"),
|
||||
errhint("Provide the option \"table_name\" with value target table's"
|
||||
" name")));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NotifyUtilityHookConstraintDropped sets ConstraintDropped to true to tell us
|
||||
* last command dropped a table constraint.
|
||||
|
|
|
@ -30,7 +30,7 @@ DeparseAlterTableSchemaStmt(Node *node)
|
|||
StringInfoData str = { 0 };
|
||||
initStringInfo(&str);
|
||||
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
|
||||
|
||||
AppendAlterTableSchemaStmt(&str, stmt);
|
||||
return str.data;
|
||||
|
@ -40,8 +40,10 @@ DeparseAlterTableSchemaStmt(Node *node)
|
|||
static void
|
||||
AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
|
||||
{
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
appendStringInfo(buf, "ALTER TABLE ");
|
||||
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
|
||||
|
||||
bool isForeignTable = stmt->objectType == OBJECT_FOREIGN_TABLE;
|
||||
appendStringInfo(buf, "ALTER %sTABLE ", isForeignTable ? "FOREIGN " : "");
|
||||
if (stmt->missing_ok)
|
||||
{
|
||||
appendStringInfo(buf, "IF EXISTS ");
|
||||
|
|
|
@ -29,7 +29,7 @@ void
|
|||
QualifyAlterTableSchemaStmt(Node *node)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
|
||||
|
||||
if (stmt->relation->schemaname == NULL)
|
||||
{
|
||||
|
|
|
@ -679,21 +679,24 @@ MetadataCreateCommands(void)
|
|||
/* after all tables are created, create the metadata */
|
||||
foreach_ptr(cacheEntry, propagatedTableList)
|
||||
{
|
||||
Oid clusteredTableId = cacheEntry->relationId;
|
||||
Oid relationId = cacheEntry->relationId;
|
||||
|
||||
/* add the table metadata command first*/
|
||||
char *metadataCommand = DistributionCreateCommand(cacheEntry);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
metadataCommand);
|
||||
|
||||
/* add the truncate trigger command after the table became distributed */
|
||||
char *truncateTriggerCreateCommand =
|
||||
TruncateTriggerCreateCommand(cacheEntry->relationId);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
truncateTriggerCreateCommand);
|
||||
if (!IsForeignTable(relationId))
|
||||
{
|
||||
/* add the truncate trigger command after the table became distributed */
|
||||
char *truncateTriggerCreateCommand =
|
||||
TruncateTriggerCreateCommand(cacheEntry->relationId);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
truncateTriggerCreateCommand);
|
||||
}
|
||||
|
||||
/* add the pg_dist_shard{,placement} entries */
|
||||
List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
|
||||
|
||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||
|
@ -844,8 +847,11 @@ GetDistributedTableDDLEvents(Oid relationId)
|
|||
commandList = lappend(commandList, metadataCommand);
|
||||
|
||||
/* commands to create the truncate trigger of the table */
|
||||
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
|
||||
commandList = lappend(commandList, truncateTriggerCreateCommand);
|
||||
if (!IsForeignTable(relationId))
|
||||
{
|
||||
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
|
||||
commandList = lappend(commandList, truncateTriggerCreateCommand);
|
||||
}
|
||||
|
||||
/* commands to insert pg_dist_shard & pg_dist_placement entries */
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
|
|
|
@ -2162,3 +2162,14 @@ TableOwner(Oid relationId)
|
|||
|
||||
return GetUserNameFromId(userId, false);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsForeignTable takes a relation id and returns true if it's a foreign table.
|
||||
* Returns false otherwise.
|
||||
*/
|
||||
bool
|
||||
IsForeignTable(Oid relationId)
|
||||
{
|
||||
return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE;
|
||||
}
|
||||
|
|
|
@ -576,22 +576,6 @@ GetPreLoadTableCreationCommands(Oid relationId,
|
|||
|
||||
PushOverrideEmptySearchPath(CurrentMemoryContext);
|
||||
|
||||
/* if foreign table, fetch extension and server definitions */
|
||||
char tableType = get_rel_relkind(relationId);
|
||||
if (tableType == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
char *extensionDef = pg_get_extensiondef_string(relationId);
|
||||
char *serverDef = pg_get_serverdef_string(relationId);
|
||||
|
||||
if (extensionDef != NULL)
|
||||
{
|
||||
tableDDLEventList = lappend(tableDDLEventList,
|
||||
makeTableDDLCommandString(extensionDef));
|
||||
}
|
||||
tableDDLEventList = lappend(tableDDLEventList,
|
||||
makeTableDDLCommandString(serverDef));
|
||||
}
|
||||
|
||||
/* fetch table schema and column option definitions */
|
||||
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
|
||||
includeSequenceDefaults,
|
||||
|
|
|
@ -319,7 +319,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
foreach(colocatedTableCell, colocatedTableList)
|
||||
{
|
||||
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
|
||||
char relationKind = '\0';
|
||||
|
||||
/* check that user has owner rights in all co-located tables */
|
||||
EnsureTableOwner(colocatedTableId);
|
||||
|
@ -332,8 +331,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
|
||||
|
||||
relationKind = get_rel_relkind(colocatedTableId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
if (IsForeignTable(relationId))
|
||||
{
|
||||
char *relationName = get_rel_name(colocatedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
|
@ -659,7 +657,6 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
|
|||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
|
||||
char relationKind = get_rel_relkind(distributedTableId);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
|
||||
/* prevent table from being dropped */
|
||||
|
@ -667,7 +664,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
|
|||
|
||||
EnsureTableOwner(distributedTableId);
|
||||
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
if (IsForeignTable(distributedTableId))
|
||||
{
|
||||
char *relationName = get_rel_name(distributedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
|
@ -872,8 +869,7 @@ EnsureTableListSuitableForReplication(List *tableIdList)
|
|||
Oid tableId = InvalidOid;
|
||||
foreach_oid(tableId, tableIdList)
|
||||
{
|
||||
char relationKind = get_rel_relkind(tableId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
if (IsForeignTable(tableId))
|
||||
{
|
||||
char *relationName = get_rel_name(tableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
|
@ -1462,7 +1458,7 @@ RecreateTableDDLCommandList(Oid relationId)
|
|||
relationName);
|
||||
|
||||
StringInfo dropCommand = makeStringInfo();
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
|
||||
|
||||
/* build appropriate DROP command based on relation kind */
|
||||
|
@ -1471,7 +1467,7 @@ RecreateTableDDLCommandList(Oid relationId)
|
|||
appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND,
|
||||
qualifiedRelationName);
|
||||
}
|
||||
else if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
else if (IsForeignTable(relationId))
|
||||
{
|
||||
appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND,
|
||||
qualifiedRelationName);
|
||||
|
|
|
@ -105,7 +105,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
char storageType = SHARD_STORAGE_TABLE;
|
||||
|
||||
Oid relationId = ResolveRelationId(relationNameText, false);
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
|
||||
EnsureTablePermissions(relationId, ACL_INSERT);
|
||||
CheckDistributedTable(relationId);
|
||||
|
@ -127,7 +126,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
LockRelationOid(DistNodeRelationId(), RowShareLock);
|
||||
|
||||
/* set the storage type of foreign tables to 'f' */
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
if (IsForeignTable(relationId))
|
||||
{
|
||||
storageType = SHARD_STORAGE_FOREIGN;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
|
|||
/* we don't extend names in extension or schema commands */
|
||||
NodeTag nodeType = nodeTag(parseTree);
|
||||
if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt ||
|
||||
nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt)
|
||||
nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt ||
|
||||
nodeType == T_CreateForeignServerStmt)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -276,30 +277,7 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
|
|||
break;
|
||||
}
|
||||
|
||||
case T_CreateForeignServerStmt:
|
||||
{
|
||||
CreateForeignServerStmt *serverStmt = (CreateForeignServerStmt *) parseTree;
|
||||
char **serverName = &(serverStmt->servername);
|
||||
|
||||
AppendShardIdToName(serverName, shardId);
|
||||
break;
|
||||
}
|
||||
|
||||
case T_CreateForeignTableStmt:
|
||||
{
|
||||
CreateForeignTableStmt *createStmt = (CreateForeignTableStmt *) parseTree;
|
||||
char **serverName = &(createStmt->servername);
|
||||
|
||||
AppendShardIdToName(serverName, shardId);
|
||||
|
||||
/*
|
||||
* Since CreateForeignTableStmt inherits from CreateStmt and any change
|
||||
* performed on CreateStmt should be done here too, we simply *fall
|
||||
* through* to avoid code repetition.
|
||||
*/
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
case T_CreateStmt:
|
||||
{
|
||||
CreateStmt *createStmt = (CreateStmt *) parseTree;
|
||||
|
|
|
@ -45,7 +45,7 @@ static long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char d
|
|||
/*
|
||||
* worker_drop_distributed_table drops the distributed table with the given oid,
|
||||
* then, removes the associated rows from pg_dist_partition, pg_dist_shard and
|
||||
* pg_dist_placement. The function also drops the server for foreign tables.
|
||||
* pg_dist_placement.
|
||||
*
|
||||
* Note that drop fails if any dependent objects are present for any of the
|
||||
* distributed tables. Also, shard placements of the distributed tables are
|
||||
|
@ -64,7 +64,6 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
|||
Oid relationId = ResolveRelationId(relationName, true);
|
||||
|
||||
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
|
||||
char relationKind = '\0';
|
||||
|
||||
if (!OidIsValid(relationId))
|
||||
{
|
||||
|
@ -79,7 +78,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
|||
|
||||
/* first check the relation type */
|
||||
Relation distributedRelation = relation_open(relationId, AccessShareLock);
|
||||
relationKind = distributedRelation->rd_rel->relkind;
|
||||
|
||||
EnsureRelationKindSupported(relationId);
|
||||
|
||||
/* close the relation since we do not need anymore */
|
||||
|
@ -105,28 +104,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
|||
UnmarkObjectDistributed(&ownedSequenceAddress);
|
||||
}
|
||||
|
||||
/* drop the server for the foreign relations */
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
ObjectAddresses *objects = new_object_addresses();
|
||||
ObjectAddress foreignServerObject = { InvalidOid, InvalidOid, 0 };
|
||||
ForeignTable *foreignTable = GetForeignTable(relationId);
|
||||
Oid serverId = foreignTable->serverid;
|
||||
|
||||
/* prepare foreignServerObject for dropping the server */
|
||||
foreignServerObject.classId = ForeignServerRelationId;
|
||||
foreignServerObject.objectId = serverId;
|
||||
foreignServerObject.objectSubId = 0;
|
||||
|
||||
/* add the addresses that are going to be dropped */
|
||||
add_exact_object_address(&distributedTableObject, objects);
|
||||
add_exact_object_address(&foreignServerObject, objects);
|
||||
|
||||
/* drop both the table and the server */
|
||||
performMultipleDeletions(objects, DROP_RESTRICT,
|
||||
PERFORM_DELETION_INTERNAL);
|
||||
}
|
||||
else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
|
||||
if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
|
||||
{
|
||||
/*
|
||||
* If the table is owned by an extension, we cannot drop it, nor should we
|
||||
|
|
|
@ -247,6 +247,13 @@ extern ObjectAddress AlterForeignServerOwnerStmtObjectAddress(Node *node, bool
|
|||
missing_ok);
|
||||
extern List * GetForeignServerCreateDDLCommand(Oid serverId);
|
||||
|
||||
|
||||
/* foreign_table.c - forward declarations */
|
||||
extern List * PreprocessAlterForeignTableSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
|
||||
|
||||
/* function.c - forward declarations */
|
||||
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
|
|
@ -264,6 +264,7 @@ extern void EnsureTableNotDistributed(Oid relationId);
|
|||
extern void EnsureRelationExists(Oid relationId);
|
||||
extern bool RegularTable(Oid relationId);
|
||||
extern bool TableEmpty(Oid tableId);
|
||||
extern bool IsForeignTable(Oid relationId);
|
||||
extern bool RelationUsesIdentityColumns(TupleDesc relationDesc);
|
||||
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
||||
extern uint64 GetFirstShardId(Oid relationId);
|
||||
|
|
|
@ -225,9 +225,6 @@ CREATE FOREIGN TABLE foreign_table (
|
|||
-- observe that we do not create fdw server for shell table, both shard relation
|
||||
-- & shell relation points to the same same server object
|
||||
SELECT citus_add_local_table_to_metadata('foreign_table');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: server "fake_fdw_server" already exists, skipping
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -725,4 +725,5 @@ $$);
|
|||
(2 rows)
|
||||
|
||||
-- cleanup at exit
|
||||
set client_min_messages to error;
|
||||
DROP SCHEMA citus_local_tables_mx CASCADE;
|
||||
|
|
|
@ -0,0 +1,424 @@
|
|||
\set VERBOSITY terse
|
||||
SET citus.next_shard_id TO 1508000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.enable_local_execution TO ON;
|
||||
CREATE SCHEMA foreign_tables_schema_mx;
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
-- test adding foreign table to metadata with the guc
|
||||
SET citus.use_citus_managed_tables TO ON;
|
||||
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
|
||||
INSERT INTO foreign_table_test VALUES (1, 'text_test');
|
||||
CREATE EXTENSION postgres_fdw;
|
||||
CREATE SERVER foreign_server
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER foreign_server
|
||||
OPTIONS (user 'postgres');
|
||||
CREATE FOREIGN TABLE foreign_table (
|
||||
id integer NOT NULL,
|
||||
data text,
|
||||
a bigserial
|
||||
)
|
||||
SERVER foreign_server
|
||||
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
--verify
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
|
||||
partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
n | s
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE parent_for_foreign_tables (
|
||||
project_id integer
|
||||
) PARTITION BY HASH (project_id);
|
||||
CREATE SERVER IF NOT EXISTS srv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
|
||||
CREATE SERVER IF NOT EXISTS srv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
|
||||
CREATE SERVER IF NOT EXISTS srv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
|
||||
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (table_name 'dummy');
|
||||
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (table_name 'dummy');
|
||||
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv3 OPTIONS (table_name 'dummy');
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
(4 rows)
|
||||
|
||||
ALTER FOREIGN TABLE foreign_table SET SCHEMA public;
|
||||
ALTER FOREIGN TABLE public.foreign_table RENAME TO foreign_table_newname;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname RENAME COLUMN id TO id_test;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ADD dummy_col bigint NOT NULL DEFAULT 1;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col DROP DEFAULT;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col SET DEFAULT 2;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col TYPE int;
|
||||
ALTER TABLE foreign_table_test RENAME COLUMN id TO id_test;
|
||||
ALTER TABLE foreign_table_test ADD dummy_col int NOT NULL DEFAULT 1;
|
||||
INSERT INTO public.foreign_table_newname VALUES (2, 'test_2');
|
||||
INSERT INTO foreign_table_test VALUES (3, 'test_3');
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c check(id_test < 1000);
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint check_c;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check(id_test < 1000) NOT VALID;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
|
||||
-- trigger test
|
||||
CREATE TABLE distributed_table(value int);
|
||||
SELECT create_distributed_table('distributed_table', 'value');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
|
||||
BEGIN
|
||||
INSERT INTO distributed_table VALUES (42);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$insert_42$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER insert_42_trigger
|
||||
AFTER DELETE ON public.foreign_table_newname
|
||||
FOR EACH ROW EXECUTE FUNCTION insert_42();
|
||||
-- do the same pattern from the workers as well
|
||||
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
|
||||
delete from public.foreign_table_newname where id_test = 99;
|
||||
select * from distributed_table ORDER BY value;
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
42
|
||||
(1 row)
|
||||
|
||||
-- disable trigger
|
||||
alter foreign table public.foreign_table_newname disable trigger insert_42_trigger;
|
||||
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
|
||||
delete from public.foreign_table_newname where id_test = 99;
|
||||
-- should not insert again as trigger disabled
|
||||
select * from distributed_table ORDER BY value;
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
42
|
||||
(1 row)
|
||||
|
||||
DROP TRIGGER insert_42_trigger ON public.foreign_table_newname;
|
||||
-- should throw errors
|
||||
select alter_table_set_access_method('public.foreign_table_newname', 'columnar');
|
||||
ERROR: cannot complete operation because it is a foreign table
|
||||
select alter_distributed_table('public.foreign_table_newname', shard_count:=4);
|
||||
ERROR: cannot alter table because the table is not distributed
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO pg_monitor;
|
||||
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,pg_monitor)
|
||||
(localhost,57638,t,pg_monitor)
|
||||
(2 rows)
|
||||
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO postgres;
|
||||
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,postgres)
|
||||
(localhost,57638,t,postgres)
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
SELECT * FROM public.foreign_table_newname ORDER BY id_test;
|
||||
id_test | data | a | dummy_col
|
||||
---------------------------------------------------------------------
|
||||
1 | text_test | 1 | 1
|
||||
2 | test_2 | 1 | 2
|
||||
3 | test_3 | 2 | 1
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM foreign_table_test ORDER BY id_test;
|
||||
id_test | data | a | dummy_col
|
||||
---------------------------------------------------------------------
|
||||
1 | text_test | 1 | 1
|
||||
2 | test_2 | 1 | 2
|
||||
3 | test_3 | 2 | 1
|
||||
(3 rows)
|
||||
|
||||
-- should error out
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname DROP COLUMN id;
|
||||
ERROR: operation is not allowed on this node
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
(4 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
ALTER FOREIGN TABLE foreign_table_newname RENAME TO foreign_table;
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
ALTER FOREIGN TABLE public.foreign_table SET SCHEMA foreign_tables_schema_mx;
|
||||
ALTER FOREIGN TABLE IF EXISTS foreign_table RENAME COLUMN id_test TO id;
|
||||
ALTER TABLE foreign_table_test RENAME COLUMN id_test TO id;
|
||||
ALTER FOREIGN TABLE foreign_table DROP COLUMN id;
|
||||
ALTER FOREIGN TABLE foreign_table DROP COLUMN dummy_col;
|
||||
ALTER TABLE foreign_table_test DROP COLUMN dummy_col;
|
||||
ALTER FOREIGN TABLE foreign_table OPTIONS (DROP schema_name, SET table_name 'notable');
|
||||
SELECT run_command_on_workers($$SELECT f.ftoptions FROM pg_foreign_table f JOIN pg_class c ON f.ftrelid=c.oid WHERE c.relname = 'foreign_table';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,{table_name=notable})
|
||||
(localhost,57638,t,{table_name=notable})
|
||||
(2 rows)
|
||||
|
||||
ALTER FOREIGN TABLE foreign_table OPTIONS (ADD schema_name 'foreign_tables_schema_mx', SET table_name 'foreign_table_test');
|
||||
SELECT * FROM foreign_table ORDER BY a;
|
||||
data | a
|
||||
---------------------------------------------------------------------
|
||||
text_test | 1
|
||||
test_2 | 1
|
||||
test_3 | 2
|
||||
(3 rows)
|
||||
|
||||
-- test alter user mapping
|
||||
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'nonexistiniguser');
|
||||
-- should fail
|
||||
SELECT * FROM foreign_table ORDER BY a;
|
||||
ERROR: could not connect to server "foreign_server"
|
||||
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgres');
|
||||
-- test undistributing
|
||||
DELETE FROM foreign_table;
|
||||
SELECT undistribute_table('foreign_table');
|
||||
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
|
||||
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
|
||||
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('foreign_table','data');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT undistribute_table('foreign_table');
|
||||
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
|
||||
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
|
||||
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('foreign_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT undistribute_table('foreign_table');
|
||||
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
|
||||
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
|
||||
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO foreign_table_test VALUES (1, 'testt');
|
||||
SELECT * FROM foreign_table ORDER BY a;
|
||||
data | a
|
||||
---------------------------------------------------------------------
|
||||
testt | 3
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM foreign_table_test ORDER BY a;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
DROP TABLE parent_for_foreign_tables;
|
||||
CREATE TABLE parent_for_foreign_tables (id integer NOT NULL, data text, a bigserial)
|
||||
PARTITION BY HASH (id);
|
||||
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
SELECT citus_add_local_table_to_metadata('parent_for_foreign_tables');
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
(4 rows)
|
||||
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER srv1
|
||||
OPTIONS (user 'postgres');
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER srv2
|
||||
OPTIONS (user 'postgres');
|
||||
SELECT * FROM parent_for_foreign_tables ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
1 | testt | 3
|
||||
1 | testt | 3
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM foreign_partition_1 ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM foreign_partition_2 ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM foreign_partition_3 ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
n | s
|
||||
(4 rows)
|
||||
|
||||
SELECT * FROM parent_for_foreign_tables ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
1 | testt | 3
|
||||
1 | testt | 3
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM foreign_partition_1 ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM foreign_partition_2 ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM foreign_partition_3 ORDER BY id;
|
||||
id | data | a
|
||||
---------------------------------------------------------------------
|
||||
1 | testt | 3
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
--verify
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
|
||||
partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
CREATE SERVER foreign_server_local
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER foreign_server_local
|
||||
OPTIONS (user 'postgres');
|
||||
CREATE FOREIGN TABLE foreign_table_local (
|
||||
id integer NOT NULL,
|
||||
data text
|
||||
)
|
||||
SERVER foreign_server_local
|
||||
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
CREATE TABLE dist_tbl(a int);
|
||||
INSERT INTO dist_tbl VALUES (1);
|
||||
SELECT create_distributed_table('dist_tbl','a');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
a | id | data
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | testt
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE ref_tbl(a int);
|
||||
INSERT INTO ref_tbl VALUES (1);
|
||||
SELECT create_reference_table('ref_tbl');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
a | id | data
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | testt
|
||||
(1 row)
|
||||
|
||||
SELECT citus_add_local_table_to_metadata('foreign_table_local');
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
a | id | data
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | testt
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
a | id | data
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | testt
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
-- should error out because doesn't have a table_name field
|
||||
CREATE FOREIGN TABLE foreign_table_local_fails (
|
||||
id integer NOT NULL,
|
||||
data text
|
||||
)
|
||||
SERVER foreign_server_local
|
||||
OPTIONS (schema_name 'foreign_tables_schema_mx');
|
||||
ERROR: table_name option must be provided when using postgres_fdw with Citus
|
||||
DROP FOREIGN TABLE foreign_table_local;
|
||||
-- cleanup at exit
|
||||
set client_min_messages to error;
|
||||
DROP SCHEMA foreign_tables_schema_mx CASCADE;
|
|
@ -64,8 +64,6 @@ CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
|
|||
CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
|
||||
CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server;
|
||||
SELECT create_distributed_table('foreign_distributed_table', 'a');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -428,8 +428,6 @@ SELECT create_distributed_table('table_range', 'id', 'range');
|
|||
-- test foreign table creation
|
||||
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
|
||||
SELECT create_distributed_table('table3_groupD', 'id');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -159,8 +159,6 @@ SERVER fake_fdw_server;
|
|||
SET citus.shard_count TO 16;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -172,8 +172,6 @@ CREATE FOREIGN TABLE foreign_table (
|
|||
full_name text not null default ''
|
||||
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
|
||||
SELECT create_distributed_table('foreign_table', 'id');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
@ -200,13 +198,11 @@ order by table_name;
|
|||
|
||||
\c - - :master_host :master_port
|
||||
SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
master_get_table_ddl_events
|
||||
---------------------------------------------------------------------
|
||||
CREATE SERVER IF NOT EXISTS fake_fdw_server FOREIGN DATA WRAPPER fake_fdw
|
||||
CREATE FOREIGN TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 (id bigint NOT NULL, rename_name character(8) DEFAULT ''::text NOT NULL) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true')
|
||||
ALTER TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 OWNER TO postgres
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- propagating views is not supported
|
||||
CREATE VIEW local_view AS SELECT * FROM simple_table;
|
||||
|
|
|
@ -103,8 +103,6 @@ CREATE FOREIGN TABLE remote_engagements (
|
|||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -15,6 +15,26 @@ SET citus.enable_ddl_propagation TO ON;
|
|||
CREATE SERVER foreign_server_dependent_schema
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'test');
|
||||
CREATE FOREIGN TABLE foreign_table (
|
||||
id integer NOT NULL,
|
||||
data text
|
||||
)
|
||||
SERVER foreign_server_dependent_schema
|
||||
OPTIONS (schema_name 'test_dependent_schema', table_name 'foreign_table_test');
|
||||
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId=>0);
|
||||
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT citus_add_local_table_to_metadata('foreign_table');
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE foreign_table OWNER TO pg_monitor;
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
|
@ -38,6 +58,14 @@ SELECT run_command_on_workers(
|
|||
(localhost,57638,t,t)
|
||||
(2 rows)
|
||||
|
||||
-- verify the owner is altered on workers
|
||||
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,pg_monitor)
|
||||
(localhost,57638,t,pg_monitor)
|
||||
(2 rows)
|
||||
|
||||
CREATE SERVER foreign_server_to_drop
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'test');
|
||||
|
@ -45,6 +73,13 @@ CREATE SERVER foreign_server_to_drop
|
|||
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
|
||||
ERROR: cannot drop distributed server with other servers
|
||||
HINT: Try dropping each object in a separate DROP command
|
||||
DROP FOREIGN TABLE foreign_table;
|
||||
SELECT citus_remove_node('localhost', :master_port);
|
||||
citus_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA test_dependent_schema CASCADE;
|
||||
RESET client_min_messages;
|
||||
|
|
|
@ -145,15 +145,20 @@ CREATE FOREIGN TABLE foreign_table (
|
|||
full_name text not null default ''
|
||||
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
|
||||
SELECT create_distributed_table('foreign_table', 'id');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT undistribute_table('foreign_table');
|
||||
ERROR: cannot complete operation because it is a foreign table
|
||||
NOTICE: creating a new table for undistribute_table.foreign_table
|
||||
NOTICE: dropping the old undistribute_table.foreign_table
|
||||
NOTICE: renaming the new table to undistribute_table.foreign_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP FOREIGN TABLE foreign_table;
|
||||
SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
|
||||
start_metadata_sync_to_node
|
||||
|
|
|
@ -49,6 +49,7 @@ test: local_shard_copy
|
|||
test: undistribute_table_cascade_mx
|
||||
test: citus_local_tables_mx
|
||||
test: citus_local_tables_queries_mx
|
||||
test: foreign_tables_mx
|
||||
test: multi_mx_transaction_recovery
|
||||
test: multi_mx_modifying_xacts
|
||||
test: multi_mx_explain
|
||||
|
|
|
@ -381,5 +381,7 @@ SELECT run_command_on_workers(
|
|||
$$
|
||||
SELECT count(*) FROM pg_catalog.pg_tables WHERE tablename='citus_local_table_4'
|
||||
$$);
|
||||
|
||||
-- cleanup at exit
|
||||
set client_min_messages to error;
|
||||
DROP SCHEMA citus_local_tables_mx CASCADE;
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
\set VERBOSITY terse
|
||||
|
||||
SET citus.next_shard_id TO 1508000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.enable_local_execution TO ON;
|
||||
|
||||
CREATE SCHEMA foreign_tables_schema_mx;
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
|
||||
-- test adding foreign table to metadata with the guc
|
||||
SET citus.use_citus_managed_tables TO ON;
|
||||
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
|
||||
INSERT INTO foreign_table_test VALUES (1, 'text_test');
|
||||
CREATE EXTENSION postgres_fdw;
|
||||
CREATE SERVER foreign_server
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER foreign_server
|
||||
OPTIONS (user 'postgres');
|
||||
CREATE FOREIGN TABLE foreign_table (
|
||||
id integer NOT NULL,
|
||||
data text,
|
||||
a bigserial
|
||||
)
|
||||
SERVER foreign_server
|
||||
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
|
||||
--verify
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
|
||||
|
||||
CREATE TABLE parent_for_foreign_tables (
|
||||
project_id integer
|
||||
) PARTITION BY HASH (project_id);
|
||||
|
||||
CREATE SERVER IF NOT EXISTS srv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
|
||||
CREATE SERVER IF NOT EXISTS srv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
|
||||
CREATE SERVER IF NOT EXISTS srv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
|
||||
|
||||
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (table_name 'dummy');
|
||||
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (table_name 'dummy');
|
||||
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv3 OPTIONS (table_name 'dummy');
|
||||
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
|
||||
ALTER FOREIGN TABLE foreign_table SET SCHEMA public;
|
||||
ALTER FOREIGN TABLE public.foreign_table RENAME TO foreign_table_newname;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname RENAME COLUMN id TO id_test;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ADD dummy_col bigint NOT NULL DEFAULT 1;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col DROP DEFAULT;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col SET DEFAULT 2;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col TYPE int;
|
||||
ALTER TABLE foreign_table_test RENAME COLUMN id TO id_test;
|
||||
ALTER TABLE foreign_table_test ADD dummy_col int NOT NULL DEFAULT 1;
|
||||
INSERT INTO public.foreign_table_newname VALUES (2, 'test_2');
|
||||
INSERT INTO foreign_table_test VALUES (3, 'test_3');
|
||||
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c check(id_test < 1000);
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint check_c;
|
||||
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check(id_test < 1000) NOT VALID;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
|
||||
|
||||
-- trigger test
|
||||
CREATE TABLE distributed_table(value int);
|
||||
SELECT create_distributed_table('distributed_table', 'value');
|
||||
|
||||
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
|
||||
BEGIN
|
||||
INSERT INTO distributed_table VALUES (42);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$insert_42$ LANGUAGE plpgsql;
|
||||
|
||||
|
||||
CREATE TRIGGER insert_42_trigger
|
||||
AFTER DELETE ON public.foreign_table_newname
|
||||
FOR EACH ROW EXECUTE FUNCTION insert_42();
|
||||
|
||||
-- do the same pattern from the workers as well
|
||||
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
|
||||
delete from public.foreign_table_newname where id_test = 99;
|
||||
select * from distributed_table ORDER BY value;
|
||||
|
||||
-- disable trigger
|
||||
alter foreign table public.foreign_table_newname disable trigger insert_42_trigger;
|
||||
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
|
||||
delete from public.foreign_table_newname where id_test = 99;
|
||||
-- should not insert again as trigger disabled
|
||||
select * from distributed_table ORDER BY value;
|
||||
|
||||
DROP TRIGGER insert_42_trigger ON public.foreign_table_newname;
|
||||
|
||||
-- should throw errors
|
||||
select alter_table_set_access_method('public.foreign_table_newname', 'columnar');
|
||||
select alter_distributed_table('public.foreign_table_newname', shard_count:=4);
|
||||
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO pg_monitor;
|
||||
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO postgres;
|
||||
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
SELECT * FROM public.foreign_table_newname ORDER BY id_test;
|
||||
SELECT * FROM foreign_table_test ORDER BY id_test;
|
||||
-- should error out
|
||||
ALTER FOREIGN TABLE public.foreign_table_newname DROP COLUMN id;
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
\c - - - :master_port
|
||||
ALTER FOREIGN TABLE foreign_table_newname RENAME TO foreign_table;
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
ALTER FOREIGN TABLE public.foreign_table SET SCHEMA foreign_tables_schema_mx;
|
||||
ALTER FOREIGN TABLE IF EXISTS foreign_table RENAME COLUMN id_test TO id;
|
||||
ALTER TABLE foreign_table_test RENAME COLUMN id_test TO id;
|
||||
ALTER FOREIGN TABLE foreign_table DROP COLUMN id;
|
||||
ALTER FOREIGN TABLE foreign_table DROP COLUMN dummy_col;
|
||||
ALTER TABLE foreign_table_test DROP COLUMN dummy_col;
|
||||
ALTER FOREIGN TABLE foreign_table OPTIONS (DROP schema_name, SET table_name 'notable');
|
||||
|
||||
SELECT run_command_on_workers($$SELECT f.ftoptions FROM pg_foreign_table f JOIN pg_class c ON f.ftrelid=c.oid WHERE c.relname = 'foreign_table';$$);
|
||||
|
||||
ALTER FOREIGN TABLE foreign_table OPTIONS (ADD schema_name 'foreign_tables_schema_mx', SET table_name 'foreign_table_test');
|
||||
SELECT * FROM foreign_table ORDER BY a;
|
||||
-- test alter user mapping
|
||||
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'nonexistiniguser');
|
||||
-- should fail
|
||||
SELECT * FROM foreign_table ORDER BY a;
|
||||
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgres');
|
||||
-- test undistributing
|
||||
DELETE FROM foreign_table;
|
||||
SELECT undistribute_table('foreign_table');
|
||||
SELECT create_distributed_table('foreign_table','data');
|
||||
SELECT undistribute_table('foreign_table');
|
||||
SELECT create_reference_table('foreign_table');
|
||||
SELECT undistribute_table('foreign_table');
|
||||
|
||||
INSERT INTO foreign_table_test VALUES (1, 'testt');
|
||||
SELECT * FROM foreign_table ORDER BY a;
|
||||
SELECT * FROM foreign_table_test ORDER BY a;
|
||||
|
||||
DROP TABLE parent_for_foreign_tables;
|
||||
|
||||
CREATE TABLE parent_for_foreign_tables (id integer NOT NULL, data text, a bigserial)
|
||||
PARTITION BY HASH (id);
|
||||
|
||||
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
|
||||
SELECT citus_add_local_table_to_metadata('parent_for_foreign_tables');
|
||||
|
||||
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER srv1
|
||||
OPTIONS (user 'postgres');
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER srv2
|
||||
OPTIONS (user 'postgres');
|
||||
|
||||
SELECT * FROM parent_for_foreign_tables ORDER BY id;
|
||||
SELECT * FROM foreign_partition_1 ORDER BY id;
|
||||
SELECT * FROM foreign_partition_2 ORDER BY id;
|
||||
SELECT * FROM foreign_partition_3 ORDER BY id;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
|
||||
ORDER BY logicalrelid;
|
||||
|
||||
SELECT * FROM parent_for_foreign_tables ORDER BY id;
|
||||
SELECT * FROM foreign_partition_1 ORDER BY id;
|
||||
SELECT * FROM foreign_partition_2 ORDER BY id;
|
||||
SELECT * FROM foreign_partition_3 ORDER BY id;
|
||||
\c - - - :master_port
|
||||
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
--verify
|
||||
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
|
||||
|
||||
CREATE SERVER foreign_server_local
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
|
||||
CREATE USER MAPPING FOR CURRENT_USER
|
||||
SERVER foreign_server_local
|
||||
OPTIONS (user 'postgres');
|
||||
CREATE FOREIGN TABLE foreign_table_local (
|
||||
id integer NOT NULL,
|
||||
data text
|
||||
)
|
||||
SERVER foreign_server_local
|
||||
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
|
||||
|
||||
CREATE TABLE dist_tbl(a int);
|
||||
INSERT INTO dist_tbl VALUES (1);
|
||||
SELECT create_distributed_table('dist_tbl','a');
|
||||
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
|
||||
CREATE TABLE ref_tbl(a int);
|
||||
INSERT INTO ref_tbl VALUES (1);
|
||||
SELECT create_reference_table('ref_tbl');
|
||||
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
|
||||
SELECT citus_add_local_table_to_metadata('foreign_table_local');
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
|
||||
\c - - - :master_port
|
||||
|
||||
SET search_path TO foreign_tables_schema_mx;
|
||||
|
||||
-- should error out because doesn't have a table_name field
|
||||
CREATE FOREIGN TABLE foreign_table_local_fails (
|
||||
id integer NOT NULL,
|
||||
data text
|
||||
)
|
||||
SERVER foreign_server_local
|
||||
OPTIONS (schema_name 'foreign_tables_schema_mx');
|
||||
|
||||
DROP FOREIGN TABLE foreign_table_local;
|
||||
|
||||
-- cleanup at exit
|
||||
set client_min_messages to error;
|
||||
DROP SCHEMA foreign_tables_schema_mx CASCADE;
|
|
@ -12,6 +12,16 @@ SET citus.enable_ddl_propagation TO ON;
|
|||
CREATE SERVER foreign_server_dependent_schema
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'test');
|
||||
CREATE FOREIGN TABLE foreign_table (
|
||||
id integer NOT NULL,
|
||||
data text
|
||||
)
|
||||
SERVER foreign_server_dependent_schema
|
||||
OPTIONS (schema_name 'test_dependent_schema', table_name 'foreign_table_test');
|
||||
|
||||
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId=>0);
|
||||
SELECT citus_add_local_table_to_metadata('foreign_table');
|
||||
ALTER TABLE foreign_table OWNER TO pg_monitor;
|
||||
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||
|
||||
|
@ -21,12 +31,17 @@ SELECT run_command_on_workers(
|
|||
SELECT run_command_on_workers(
|
||||
$$SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server_dependent_schema';$$);
|
||||
|
||||
-- verify the owner is altered on workers
|
||||
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table';$$);
|
||||
|
||||
CREATE SERVER foreign_server_to_drop
|
||||
FOREIGN DATA WRAPPER postgres_fdw
|
||||
OPTIONS (host 'test');
|
||||
|
||||
--should error
|
||||
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
|
||||
DROP FOREIGN TABLE foreign_table;
|
||||
SELECT citus_remove_node('localhost', :master_port);
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA test_dependent_schema CASCADE;
|
||||
|
|
Loading…
Reference in New Issue