Merge branch 'master' into patch-1

pull/5592/head
Marco Slot 2022-01-06 18:08:20 +01:00 committed by GitHub
commit 072e100072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 1942 additions and 444 deletions

View File

@ -17,7 +17,6 @@
#include "catalog/pg_type.h"
#include "distributed/pg_version_constants.h"
#include "distributed/tuplestore.h"
#include "distributed/version_compat.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/smgr.h"
@ -26,6 +25,7 @@
#include "utils/rel.h"
#include "utils/tuplestore.h"
#include "pg_version_compat.h"
#include "columnar/columnar.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"

View File

@ -38,6 +38,7 @@
#include "safe_lib.h"
#include "access/generic_xlog.h"
#include "catalog/storage.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
@ -699,9 +700,12 @@ WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf,
uint32 len, bool clear)
{
Buffer buffer = ReadBuffer(rel, blockno);
GenericXLogState *state = GenericXLogStart(rel);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
Page page = BufferGetPage(buffer);
Page page = GenericXLogRegisterBuffer(state, buffer, GENERIC_XLOG_FULL_IMAGE);
PageHeader phdr = (PageHeader) page;
if (PageIsNew(page) || clear)
{
@ -734,28 +738,10 @@ WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf,
phdr->pd_lower = offset;
}
START_CRIT_SECTION();
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len);
phdr->pd_lower += len;
MarkBufferDirty(buffer);
if (RelationNeedsWAL(rel))
{
XLogBeginInsert();
/*
* Since columnar will mostly write whole pages we force the transmission of the
* whole image in the buffer
*/
XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE);
XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
GenericXLogFinish(state);
UnlockReleaseBuffer(buffer);
}

View File

@ -105,6 +105,8 @@ typedef struct IndexFetchColumnarData
} IndexFetchColumnarData;
ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook = NULL;
static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
@ -2298,121 +2300,6 @@ ColumnarCheckLogicalReplication(Relation rel)
}
/*
* CitusCreateAlterColumnarTableSet generates a portable
*/
static char *
CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf,
"SELECT alter_columnar_table_set(%s, "
"chunk_group_row_limit => %d, "
"stripe_row_limit => %lu, "
"compression_level => %d, "
"compression => %s);",
quote_literal_cstr(qualifiedRelationName),
options->chunkRowCount,
options->stripeRowCount,
options->compressionLevel,
quote_literal_cstr(CompressionTypeStr(options->compressionType)));
return buf.data;
}
/*
* GetTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a table. The resulting command will set the
* options of the table to the same options as the relation on the coordinator.
*/
static char *
GetTableDDLCommandColumnar(void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
tableDDLContext->relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* GetShardedTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a shard. The resulting command will set the
* options of the shard to the same options as the relation the shard is based on.
*/
char *
GetShardedTableDDLCommandColumnar(uint64 shardId, void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
/*
* AppendShardId is destructive of the original cahr *, given we want to serialize
* more than once we copy it before appending the shard id.
*/
char *relationName = pstrdup(tableDDLContext->relationName);
AppendShardIdToName(&relationName, shardId);
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* ColumnarGetCustomTableOptionsDDL returns a TableDDLCommand representing a command that
* will apply the passed columnar options to the relation identified by relationId on a
* new table or shard.
*/
static TableDDLCommand *
ColumnarGetCustomTableOptionsDDL(char *schemaName, char *relationName,
ColumnarOptions options)
{
ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0(
sizeof(ColumnarTableDDLContext));
/* build the context */
context->schemaName = schemaName;
context->relationName = relationName;
context->options = options;
/* create TableDDLCommand based on the context build above */
return makeTableDDLCommandFunction(
GetTableDDLCommandColumnar,
GetShardedTableDDLCommandColumnar,
context);
}
/*
* ColumnarGetTableOptionsDDL returns a TableDDLCommand representing a command that will
* apply the columnar options currently applicable to the relation identified by
* relationId on a new table or shard.
*/
TableDDLCommand *
ColumnarGetTableOptionsDDL(Oid relationId)
{
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
ColumnarOptions options = { 0 };
ReadColumnarOptions(relationId, &options);
return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options);
}
/*
* alter_columnar_table_set is a UDF exposed in postgres to change settings on a columnar
* table. Calling this function on a non-columnar table gives an error.
@ -2522,18 +2409,9 @@ alter_columnar_table_set(PG_FUNCTION_ARGS)
options.compressionLevel)));
}
if (EnableDDLPropagation && IsCitusTable(relationId))
if (ColumnarTableSetOptions_hook != NULL)
{
/* when a columnar table is distributed update all settings on the shards */
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName,
relationName,
options);
DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command);
ExecuteDistributedDDLJob(ddljob);
ColumnarTableSetOptions_hook(relationId, options);
}
SetColumnarOptions(relationId, &options);
@ -2618,18 +2496,9 @@ alter_columnar_table_reset(PG_FUNCTION_ARGS)
columnar_compression_level)));
}
if (EnableDDLPropagation && IsCitusTable(relationId))
if (ColumnarTableSetOptions_hook != NULL)
{
/* when a columnar table is distributed update all settings on the shards */
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName,
relationName,
options);
DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command);
ExecuteDistributedDDLJob(ddljob);
ColumnarTableSetOptions_hook(relationId, options);
}
SetColumnarOptions(relationId, &options);

View File

@ -368,7 +368,7 @@ UndistributeTable(TableConversionParameters *params)
EnsureTableNotReferencing(params->relationId, UNDISTRIBUTE_TABLE);
EnsureTableNotReferenced(params->relationId, UNDISTRIBUTE_TABLE);
}
EnsureTableNotForeign(params->relationId);
EnsureTableNotPartition(params->relationId);
if (PartitionedTable(params->relationId))
@ -994,8 +994,7 @@ EnsureTableNotReferenced(Oid relationId, char conversionType)
void
EnsureTableNotForeign(Oid relationId)
{
char relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(relationId))
{
ereport(ERROR, (errmsg("cannot complete operation "
"because it is a foreign table")));
@ -1063,7 +1062,7 @@ CreateTableConversion(TableConversionParameters *params)
BuildDistributionKeyFromColumnName(relation, con->distributionColumn);
con->originalAccessMethod = NULL;
if (!PartitionedTable(con->relationId))
if (!PartitionedTable(con->relationId) && !IsForeignTable(con->relationId))
{
HeapTuple amTuple = SearchSysCache1(AMOID, ObjectIdGetDatum(
relation->rd_rel->relam));
@ -1305,7 +1304,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
StringInfo query = makeStringInfo();
if (!PartitionedTable(sourceId))
if (!PartitionedTable(sourceId) && !IsForeignTable(sourceId))
{
if (!suppressNoticeMessages)
{
@ -1372,6 +1371,21 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
schemaName, targetName);
SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand);
}
else if (ShouldSyncTableMetadata(sourceId))
{
/*
* We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd
* lose track of the previous changes in the sequence.
*/
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency('%s');",
quote_qualified_identifier(schemaName, sourceName));
SendCommandToWorkersWithMetadata(command->data);
}
}
char *justBeforeDropCommand = NULL;
@ -1387,7 +1401,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}
resetStringInfo(query);
appendStringInfo(query, "DROP TABLE %s CASCADE",
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
quote_qualified_identifier(schemaName, sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);

View File

@ -797,6 +797,7 @@ GetDistributeObjectOps(Node *node)
return &Statistics_AlterObjectSchema;
}
case OBJECT_FOREIGN_TABLE:
case OBJECT_TABLE:
{
return &Table_AlterObjectSchema;

View File

@ -648,7 +648,7 @@ List *
PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
/*
* We will let Postgres deal with missing_ok
@ -1054,7 +1054,8 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
*/
Assert(IsCitusTable(rightRelationId));
}
else if (attachedRelationKind == RELKIND_RELATION)
else if (attachedRelationKind == RELKIND_RELATION ||
attachedRelationKind == RELKIND_FOREIGN_TABLE)
{
Assert(list_length(commandList) <= 1);
@ -1761,7 +1762,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
if (stmt->relation == NULL)
{
@ -2951,6 +2952,16 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
break;
}
case AT_GenericOptions:
{
if (IsForeignTable(relationId))
{
break;
}
}
/* fallthrough */
default:
{
ereport(ERROR,
@ -3326,7 +3337,7 @@ ObjectAddress
AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
const char *tableName = stmt->relation->relname;
Oid tableOid = InvalidOid;

View File

@ -267,15 +267,13 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
ErrorIfIllegallyChangingKnownShard(relationId);
char relationKind = get_rel_relkind(relationId);
if (IsCitusTable(relationId) &&
relationKind == RELKIND_FOREIGN_TABLE)
if (IsCitusTable(relationId) && IsForeignTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("truncating distributed foreign tables is "
"currently unsupported"),
errhint("Use citus_drop_all_shards to remove "
"foreign table's shards.")));
errhint("Consider undistributing table before TRUNCATE, "
"and then distribute or add to metadata again")));
}
}
}

View File

@ -63,6 +63,7 @@
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
@ -98,6 +99,8 @@ static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static bool ShouldAddNewTableToMetadata(Node *parsetree);
static bool ServerUsesPostgresFDW(char *serverName);
static void ErrorIfOptionListHasNoTableName(List *optionList);
/*
@ -662,6 +665,29 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
PostprocessCreateTableStmt(createStatement, queryString);
}
if (IsA(parsetree, CreateForeignTableStmt))
{
CreateForeignTableStmt *createForeignTableStmt =
(CreateForeignTableStmt *) parsetree;
CreateStmt *createTableStmt = (CreateStmt *) (&createForeignTableStmt->base);
/*
* Error out with a hint if the foreign table is using postgres_fdw and
* the option table_name is not provided.
* Citus relays all the Citus local foreign table logic to the placement of the
* Citus local table. If table_name is NOT provided, Citus would try to talk to
* the foreign postgres table over the shard's table name, which would not exist
* on the remote server.
*/
if (ServerUsesPostgresFDW(createForeignTableStmt->servername))
{
ErrorIfOptionListHasNoTableName(createForeignTableStmt->options);
}
PostprocessCreateTableStmt(createTableStmt, queryString);
}
/* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL)
{
@ -891,14 +917,24 @@ ShouldCheckUndistributeCitusLocalTables(void)
static bool
ShouldAddNewTableToMetadata(Node *parsetree)
{
if (!IsA(parsetree, CreateStmt))
CreateStmt *createTableStmt;
if (IsA(parsetree, CreateStmt))
{
/* if the command is not CREATE TABLE, we can early return false */
createTableStmt = (CreateStmt *) parsetree;
}
else if (IsA(parsetree, CreateForeignTableStmt))
{
CreateForeignTableStmt *createForeignTableStmt =
(CreateForeignTableStmt *) parsetree;
createTableStmt = (CreateStmt *) &(createForeignTableStmt->base);
}
else
{
/* if the command is not CREATE [FOREIGN] TABLE, we can early return false */
return false;
}
CreateStmt *createTableStmt = (CreateStmt *) parsetree;
if (createTableStmt->relation->relpersistence == RELPERSISTENCE_TEMP ||
createTableStmt->partbound != NULL)
{
@ -924,6 +960,50 @@ ShouldAddNewTableToMetadata(Node *parsetree)
}
/*
* ServerUsesPostgresFDW gets a foreign server name and returns true if the FDW that
* the server depends on is postgres_fdw. Returns false otherwise.
*/
static bool
ServerUsesPostgresFDW(char *serverName)
{
ForeignServer *server = GetForeignServerByName(serverName, false);
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
if (strcmp(fdw->fdwname, "postgres_fdw") == 0)
{
return true;
}
return false;
}
/*
* ErrorIfOptionListHasNoTableName gets an option list (DefElem) and errors out
* if the list does not contain a table_name element.
*/
static void
ErrorIfOptionListHasNoTableName(List *optionList)
{
char *table_nameString = "table_name";
DefElem *option = NULL;
foreach_ptr(option, optionList)
{
char *optionName = option->defname;
if (strcmp(optionName, table_nameString) == 0)
{
return;
}
}
ereport(ERROR, (errmsg(
"table_name option must be provided when using postgres_fdw with Citus"),
errhint("Provide the option \"table_name\" with value target table's"
" name")));
}
/*
* NotifyUtilityHookConstraintDropped sets ConstraintDropped to true to tell us
* last command dropped a table constraint.
@ -1432,3 +1512,26 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}
/*
* ColumnarTableSetOptionsHook propagates columnar table options to shards, if
* necessary.
*/
void
ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options)
{
if (EnableDDLPropagation && IsCitusTable(relationId))
{
/* when a columnar table is distributed update all settings on the shards */
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName,
relationName,
options);
DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command);
ExecuteDistributedDDLJob(ddljob);
}
}

View File

@ -14,6 +14,7 @@
#include "distributed/pg_version_constants.h"
#include "catalog/namespace.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"

View File

@ -30,7 +30,7 @@ DeparseAlterTableSchemaStmt(Node *node)
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
AppendAlterTableSchemaStmt(&str, stmt);
return str.data;
@ -40,8 +40,10 @@ DeparseAlterTableSchemaStmt(Node *node)
static void
AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{
Assert(stmt->objectType == OBJECT_TABLE);
appendStringInfo(buf, "ALTER TABLE ");
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
bool isForeignTable = stmt->objectType == OBJECT_FOREIGN_TABLE;
appendStringInfo(buf, "ALTER %sTABLE ", isForeignTable ? "FOREIGN " : "");
if (stmt->missing_ok)
{
appendStringInfo(buf, "IF EXISTS ");

View File

@ -29,7 +29,7 @@ void
QualifyAlterTableSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);
if (stmt->relation->schemaname == NULL)
{

View File

@ -679,21 +679,24 @@ MetadataCreateCommands(void)
/* after all tables are created, create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid clusteredTableId = cacheEntry->relationId;
Oid relationId = cacheEntry->relationId;
/* add the table metadata command first*/
char *metadataCommand = DistributionCreateCommand(cacheEntry);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
metadataCommand);
/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
if (!IsForeignTable(relationId))
{
/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
}
/* add the pg_dist_shard{,placement} entries */
List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
List *shardIntervalList = LoadShardIntervalList(relationId);
List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
@ -844,8 +847,11 @@ GetDistributedTableDDLEvents(Oid relationId)
commandList = lappend(commandList, metadataCommand);
/* commands to create the truncate trigger of the table */
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
if (!IsForeignTable(relationId))
{
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
}
/* commands to insert pg_dist_shard & pg_dist_placement entries */
List *shardIntervalList = LoadShardIntervalList(relationId);
@ -898,8 +904,16 @@ MetadataDropCommands(void)
dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
detachPartitionCommandList);
/*
* We are re-creating the metadata, so not lose track of the
* sequences by preventing them dropped via DROP TABLE.
*/
dropSnapshotCommandList =
lappend(dropSnapshotCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
REMOVE_ALL_CITUS_TABLES_COMMAND);
dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
dropSnapshotCommandList = lappend(dropSnapshotCommandList,

View File

@ -2162,3 +2162,14 @@ TableOwner(Oid relationId)
return GetUserNameFromId(userId, false);
}
/*
* IsForeignTable takes a relation id and returns true if it's a foreign table.
* Returns false otherwise.
*/
bool
IsForeignTable(Oid relationId)
{
return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE;
}

View File

@ -16,8 +16,10 @@
#include "distributed/argutils.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "distributed/worker_manager.h"
#include "utils/builtins.h"
/* simple query to run on workers to check connectivity */

View File

@ -64,8 +64,6 @@
#include "utils/ruleutils.h"
#include "utils/varlena.h"
#include "columnar/columnar_tableam.h"
/* Shard related configuration */
int ShardCount = 32;
int ShardReplicationFactor = 1; /* desired replication factor for shards */
@ -80,6 +78,10 @@ static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_
int indexFlags);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options);
static char * GetTableDDLCommandColumnar(void *context);
static TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_get_table_metadata);
@ -574,22 +576,6 @@ GetPreLoadTableCreationCommands(Oid relationId,
PushOverrideEmptySearchPath(CurrentMemoryContext);
/* if foreign table, fetch extension and server definitions */
char tableType = get_rel_relkind(relationId);
if (tableType == RELKIND_FOREIGN_TABLE)
{
char *extensionDef = pg_get_extensiondef_string(relationId);
char *serverDef = pg_get_serverdef_string(relationId);
if (extensionDef != NULL)
{
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(extensionDef));
}
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(serverDef));
}
/* fetch table schema and column option definitions */
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
includeSequenceDefaults,
@ -1018,3 +1004,118 @@ GetTableDDLCommand(TableDDLCommand *command)
/* unreachable: compiler should warn/error when not all cases are covered above */
ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type)));
}
/*
* CitusCreateAlterColumnarTableSet generates a portable
*/
static char *
CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf,
"SELECT alter_columnar_table_set(%s, "
"chunk_group_row_limit => %d, "
"stripe_row_limit => %lu, "
"compression_level => %d, "
"compression => %s);",
quote_literal_cstr(qualifiedRelationName),
options->chunkRowCount,
options->stripeRowCount,
options->compressionLevel,
quote_literal_cstr(CompressionTypeStr(options->compressionType)));
return buf.data;
}
/*
* GetTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a table. The resulting command will set the
* options of the table to the same options as the relation on the coordinator.
*/
static char *
GetTableDDLCommandColumnar(void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
tableDDLContext->relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* GetShardedTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a shard. The resulting command will set the
* options of the shard to the same options as the relation the shard is based on.
*/
char *
GetShardedTableDDLCommandColumnar(uint64 shardId, void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
/*
* AppendShardId is destructive of the original cahr *, given we want to serialize
* more than once we copy it before appending the shard id.
*/
char *relationName = pstrdup(tableDDLContext->relationName);
AppendShardIdToName(&relationName, shardId);
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* ColumnarGetCustomTableOptionsDDL returns a TableDDLCommand representing a command that
* will apply the passed columnar options to the relation identified by relationId on a
* new table or shard.
*/
TableDDLCommand *
ColumnarGetCustomTableOptionsDDL(char *schemaName, char *relationName,
ColumnarOptions options)
{
ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0(
sizeof(ColumnarTableDDLContext));
/* build the context */
context->schemaName = schemaName;
context->relationName = relationName;
context->options = options;
/* create TableDDLCommand based on the context build above */
return makeTableDDLCommandFunction(
GetTableDDLCommandColumnar,
GetShardedTableDDLCommandColumnar,
context);
}
/*
* ColumnarGetTableOptionsDDL returns a TableDDLCommand representing a command that will
* apply the columnar options currently applicable to the relation identified by
* relationId on a new table or shard.
*/
static TableDDLCommand *
ColumnarGetTableOptionsDDL(Oid relationId)
{
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
ColumnarOptions options = { 0 };
ReadColumnarOptions(relationId, &options);
return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options);
}

View File

@ -319,7 +319,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
foreach(colocatedTableCell, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
char relationKind = '\0';
/* check that user has owner rights in all co-located tables */
EnsureTableOwner(colocatedTableId);
@ -332,8 +331,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
*/
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
relationKind = get_rel_relkind(colocatedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(relationId))
{
char *relationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -659,7 +657,6 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid distributedTableId = shardInterval->relationId;
char relationKind = get_rel_relkind(distributedTableId);
char *tableOwner = TableOwner(shardInterval->relationId);
/* prevent table from being dropped */
@ -667,7 +664,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
EnsureTableOwner(distributedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(distributedTableId))
{
char *relationName = get_rel_name(distributedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -872,8 +869,7 @@ EnsureTableListSuitableForReplication(List *tableIdList)
Oid tableId = InvalidOid;
foreach_oid(tableId, tableIdList)
{
char relationKind = get_rel_relkind(tableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(tableId))
{
char *relationName = get_rel_name(tableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1462,7 +1458,7 @@ RecreateTableDDLCommandList(Oid relationId)
relationName);
StringInfo dropCommand = makeStringInfo();
char relationKind = get_rel_relkind(relationId);
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
/* build appropriate DROP command based on relation kind */
@ -1471,7 +1467,7 @@ RecreateTableDDLCommandList(Oid relationId)
appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND,
qualifiedRelationName);
}
else if (relationKind == RELKIND_FOREIGN_TABLE)
else if (IsForeignTable(relationId))
{
appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND,
qualifiedRelationName);

View File

@ -105,7 +105,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char storageType = SHARD_STORAGE_TABLE;
Oid relationId = ResolveRelationId(relationNameText, false);
char relationKind = get_rel_relkind(relationId);
EnsureTablePermissions(relationId, ACL_INSERT);
CheckDistributedTable(relationId);
@ -127,7 +126,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
LockRelationOid(DistNodeRelationId(), RowShareLock);
/* set the storage type of foreign tables to 'f' */
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(relationId))
{
storageType = SHARD_STORAGE_FOREIGN;
}

View File

@ -72,7 +72,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
/* we don't extend names in extension or schema commands */
NodeTag nodeType = nodeTag(parseTree);
if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt ||
nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt)
nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt ||
nodeType == T_CreateForeignServerStmt)
{
return;
}
@ -276,30 +277,7 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
break;
}
case T_CreateForeignServerStmt:
{
CreateForeignServerStmt *serverStmt = (CreateForeignServerStmt *) parseTree;
char **serverName = &(serverStmt->servername);
AppendShardIdToName(serverName, shardId);
break;
}
case T_CreateForeignTableStmt:
{
CreateForeignTableStmt *createStmt = (CreateForeignTableStmt *) parseTree;
char **serverName = &(createStmt->servername);
AppendShardIdToName(serverName, shardId);
/*
* Since CreateForeignTableStmt inherits from CreateStmt and any change
* performed on CreateStmt should be done here too, we simply *fall
* through* to avoid code repetition.
*/
}
/* fallthrough */
case T_CreateStmt:
{
CreateStmt *createStmt = (CreateStmt *) parseTree;

View File

@ -311,6 +311,12 @@ _PG_init(void)
original_client_auth_hook = ClientAuthentication_hook;
ClientAuthentication_hook = CitusAuthHook;
/*
* When the options change on a columnar table, we may need to propagate
* the changes to shards.
*/
ColumnarTableSetOptions_hook = ColumnarTableSetOptionsHook;
InitializeMaintenanceDaemon();
/* initialize coordinated transaction management */

View File

@ -8,6 +8,8 @@
#include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
#include "udfs/citus_run_local_command/11.0-1.sql"
#include "udfs/worker_drop_sequence_dependency/11.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
DROP FUNCTION pg_catalog.master_get_table_metadata(text);

View File

@ -45,3 +45,4 @@ DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer);
DROP FUNCTION pg_catalog.citus_run_local_command(text);
DROP FUNCTION pg_catalog.worker_drop_sequence_dependency(text);

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_drop_sequence_dependency(table_name text);
CREATE OR REPLACE FUNCTION pg_catalog.worker_drop_sequence_dependency(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_sequence_dependency$$;
COMMENT ON FUNCTION pg_catalog.worker_drop_sequence_dependency(table_name text)
IS 'drop the Citus tables sequence dependency';

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_drop_sequence_dependency(table_name text);
CREATE OR REPLACE FUNCTION pg_catalog.worker_drop_sequence_dependency(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_sequence_dependency$$;
COMMENT ON FUNCTION pg_catalog.worker_drop_sequence_dependency(table_name text)
IS 'drop the Citus tables sequence dependency';

View File

@ -17,6 +17,10 @@
#include "access/heapam.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/pg_depend.h"
#if PG_VERSION_NUM < PG_VERSION_13
#include "catalog/pg_depend_d.h"
#endif
#include "catalog/pg_foreign_server.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distribution_column.h"
@ -29,14 +33,19 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency);
#if PG_VERSION_NUM < PG_VERSION_13
static long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype,
Oid refclassId, Oid refobjectId);
#endif
/*
* worker_drop_distributed_table drops the distributed table with the given oid,
* then, removes the associated rows from pg_dist_partition, pg_dist_shard and
* pg_dist_placement. The function also drops the server for foreign tables.
* pg_dist_placement.
*
* Note that drop fails if any dependent objects are present for any of the
* distributed tables. Also, shard placements of the distributed tables are
@ -55,7 +64,6 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
Oid relationId = ResolveRelationId(relationName, true);
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
char relationKind = '\0';
if (!OidIsValid(relationId))
{
@ -70,7 +78,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
/* first check the relation type */
Relation distributedRelation = relation_open(relationId, AccessShareLock);
relationKind = distributedRelation->rd_rel->relkind;
EnsureRelationKindSupported(relationId);
/* close the relation since we do not need anymore */
@ -96,28 +104,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
UnmarkObjectDistributed(&ownedSequenceAddress);
}
/* drop the server for the foreign relations */
if (relationKind == RELKIND_FOREIGN_TABLE)
{
ObjectAddresses *objects = new_object_addresses();
ObjectAddress foreignServerObject = { InvalidOid, InvalidOid, 0 };
ForeignTable *foreignTable = GetForeignTable(relationId);
Oid serverId = foreignTable->serverid;
/* prepare foreignServerObject for dropping the server */
foreignServerObject.classId = ForeignServerRelationId;
foreignServerObject.objectId = serverId;
foreignServerObject.objectSubId = 0;
/* add the addresses that are going to be dropped */
add_exact_object_address(&distributedTableObject, objects);
add_exact_object_address(&foreignServerObject, objects);
/* drop both the table and the server */
performMultipleDeletions(objects, DROP_RESTRICT,
PERFORM_DELETION_INTERNAL);
}
else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{
/*
* If the table is owned by an extension, we cannot drop it, nor should we
@ -153,3 +140,108 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* worker_drop_sequence_dependency is a UDF that removes the dependency
* of all the sequences for the given table.
*
* The main purpose of this UDF is to prevent dropping the sequences while
* re-creating the same table such as changing the shard count, converting
* a citus local table to a distributed table or re-syncing the metadata.
*/
Datum
worker_drop_sequence_dependency(PG_FUNCTION_ARGS)
{
text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, true);
if (!OidIsValid(relationId))
{
ereport(NOTICE, (errmsg("relation %s does not exist, skipping",
text_to_cstring(relationName))));
PG_RETURN_VOID();
}
EnsureTableOwner(relationId);
/* break the dependent sequences from the table */
#if PG_VERSION_NUM >= PG_VERSION_13
List *ownedSequences = getOwnedSequences(relationId);
#else
List *ownedSequences = getOwnedSequences(relationId, InvalidAttrNumber);
#endif
Oid ownedSequenceOid = InvalidOid;
foreach_oid(ownedSequenceOid, ownedSequences)
{
/* the caller doesn't want to drop the sequence, so break the dependency */
deleteDependencyRecordsForSpecific(RelationRelationId, ownedSequenceOid,
DEPENDENCY_AUTO, RelationRelationId,
relationId);
}
if (list_length(ownedSequences) > 0)
{
/* if we delete at least one dependency, let next commands know */
CommandCounterIncrement();
}
PG_RETURN_VOID();
}
/* *INDENT-OFF* */
#if PG_VERSION_NUM < PG_VERSION_13
/*
* This function is already available on PG 13+.
* deleteDependencyRecordsForSpecific -- delete all records with given depender
* classId/objectId, dependee classId/objectId, of the given deptype.
* Returns the number of records deleted.
*/
static long
deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype,
Oid refclassId, Oid refobjectId)
{
long count = 0;
Relation depRel;
ScanKeyData key[2];
HeapTuple tup;
depRel = table_open(DependRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classId));
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(objectId));
SysScanDesc scan =
systable_beginscan(depRel, DependDependerIndexId, true,
NULL, 2, key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup);
if (depform->refclassid == refclassId &&
depform->refobjid == refobjectId &&
depform->deptype == deptype)
{
CatalogTupleDelete(depRel, &tup->t_self);
count++;
}
}
systable_endscan(scan);
table_close(depRel, RowExclusiveLock);
return count;
}
#endif
/* *INDENT-ON* */

View File

@ -62,18 +62,6 @@ typedef struct ColumnarOptions
} ColumnarOptions;
/*
* ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction
* instance described below.
*/
typedef struct ColumnarTableDDLContext
{
char *schemaName;
char *relationName;
ColumnarOptions options;
} ColumnarTableDDLContext;
/* ColumnChunkSkipNode contains statistics for a ColumnChunkData. */
typedef struct ColumnChunkSkipNode
{
@ -209,11 +197,16 @@ typedef struct ColumnarReadState ColumnarReadState;
struct ColumnarWriteState;
typedef struct ColumnarWriteState ColumnarWriteState;
/* GUCs */
extern int columnar_compression;
extern int columnar_stripe_row_limit;
extern int columnar_chunk_group_row_limit;
extern int columnar_compression_level;
/* called when the user changes options on the given relation */
typedef void (*ColumnarTableSetOptions_hook_type)(Oid relid, ColumnarOptions options);
extern ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook;
extern void columnar_init_gucs(void);
extern CompressionType ParseCompressionType(const char *compressionTypeString);

View File

@ -60,8 +60,6 @@ extern TableScanDesc columnar_beginscan_extended(Relation relation, Snapshot sna
extern int64 ColumnarScanChunkGroupsFiltered(ColumnarScanDesc columnarScanDesc);
extern bool ColumnarSupportsIndexAM(char *indexAMName);
extern bool IsColumnarTableAmTable(Oid relationId);
extern TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context);
#endif /* COLUMNAR_TABLEAM_H */

View File

@ -247,6 +247,13 @@ extern ObjectAddress AlterForeignServerOwnerStmtObjectAddress(Node *node, bool
missing_ok);
extern List * GetForeignServerCreateDDLCommand(Oid serverId);
/* foreign_table.c - forward declarations */
extern List * PreprocessAlterForeignTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
/* function.c - forward declarations */
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);

View File

@ -91,6 +91,7 @@ extern void UndistributeDisconnectedCitusLocalTables(void);
extern void NotifyUtilityHookConstraintDropped(void);
extern void ResetConstraintDropped(void);
extern void ExecuteDistributedDDLJob(DDLJob *ddlJob);
extern void ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options);
/* forward declarations for sending custom commands to a distributed table */
extern DDLJob * CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command);

View File

@ -21,6 +21,7 @@
#include "nodes/pg_list.h"
#include "distributed/metadata_utility.h"
#include "columnar/columnar.h"
/*
* In our distributed database, we need a mechanism to make remote procedure
@ -181,6 +182,17 @@ struct TableDDLCommand
};
};
/*
* ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction
* instance described below.
*/
typedef struct ColumnarTableDDLContext
{
char *schemaName;
char *relationName;
ColumnarOptions options;
} ColumnarTableDDLContext;
/* make functions for TableDDLCommand */
extern TableDDLCommand * makeTableDDLCommandString(char *commandStr);
extern TableDDLCommand * makeTableDDLCommandFunction(TableDDLFunction function,
@ -190,7 +202,11 @@ extern TableDDLCommand * makeTableDDLCommandFunction(TableDDLFunction function,
extern char * GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId,
char *schemaName);
extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context);
extern char * GetTableDDLCommand(TableDDLCommand *command);
extern TableDDLCommand * ColumnarGetCustomTableOptionsDDL(char *schemaName,
char *relationName,
ColumnarOptions options);
/* Config variables managed via guc.c */

View File

@ -16,9 +16,9 @@
#include "c.h"
#include "nodes/pg_list.h"
#include "pg_version_compat.h"
#include "utils/array.h"
#include "utils/hsearch.h"
#include "distributed/version_compat.h"
/*

View File

@ -67,8 +67,11 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
#define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"
#define DISABLE_OBJECT_PROPAGATION "SET citus.enable_object_propagation TO 'off'"

View File

@ -264,6 +264,7 @@ extern void EnsureTableNotDistributed(Oid relationId);
extern void EnsureRelationExists(Oid relationId);
extern bool RegularTable(Oid relationId);
extern bool TableEmpty(Oid tableId);
extern bool IsForeignTable(Oid relationId);
extern bool RelationUsesIdentityColumns(TupleDesc relationDesc);
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
extern uint64 GetFirstShardId(Oid relationId);

View File

@ -13,8 +13,6 @@
#include "postgres.h"
#include "distributed/pg_version_constants.h"
#include "access/sdir.h"
#include "access/heapam.h"
#include "commands/explain.h"
@ -30,113 +28,10 @@
#include "tcop/tcopprot.h"
#endif
#if PG_VERSION_NUM >= PG_VERSION_14
#define AlterTableStmtObjType_compat(a) ((a)->objtype)
#define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a, b)
#define getObjectIdentity_compat(a, b) getObjectIdentity(a, b)
#include "pg_version_compat.h"
/* for MemoryContextMethods->stats */
#define stats_compat(a, b, c, d, e) stats(a, b, c, d, e)
#define FuncnameGetCandidates_compat(a, b, c, d, e, f, g) \
FuncnameGetCandidates(a, b, c, d, e, f, g)
#define expand_function_arguments_compat(a, b, c, d) expand_function_arguments(a, b, c, d)
#define BeginCopyFrom_compat(a, b, c, d, e, f, g, h) BeginCopyFrom(a, b, c, d, e, f, g, h)
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
standard_ProcessUtility(a, b, c, d, e, f, g, h)
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) \
ProcessUtility(a, b, c, d, e, f, g, h)
#define SetTuplestoreDestReceiverParams_compat(a, b, c, d, e, f) \
SetTuplestoreDestReceiverParams(a, b, c, d, e, f)
#define pgproc_statusflags_compat(pgproc) ((pgproc)->statusFlags)
#define get_partition_parent_compat(a, b) get_partition_parent(a, b)
#define RelationGetPartitionDesc_compat(a, b) RelationGetPartitionDesc(a, b)
#define make_simple_restrictinfo_compat(a, b) make_simple_restrictinfo(a, b)
#define pull_varnos_compat(a, b) pull_varnos(a, b)
#define pg_get_statisticsobj_worker_compat(a, b, c) pg_get_statisticsobj_worker(a, b, c)
#else
#define AlterTableStmtObjType_compat(a) ((a)->relkind)
#define F_NEXTVAL F_NEXTVAL_OID
#define ROLE_PG_MONITOR DEFAULT_ROLE_MONITOR
#define PROC_WAIT_STATUS_WAITING STATUS_WAITING
#define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a)
#define getObjectIdentity_compat(a, b) getObjectIdentity(a)
/* for MemoryContextMethods->stats */
#define stats_compat(a, b, c, d, e) stats(a, b, c, d)
#define FuncnameGetCandidates_compat(a, b, c, d, e, f, g) \
FuncnameGetCandidates(a, b, c, d, e, g)
#define expand_function_arguments_compat(a, b, c, d) expand_function_arguments(a, c, d)
#define VacOptValue VacOptTernaryValue
#define VACOPTVALUE_UNSPECIFIED VACOPT_TERNARY_DEFAULT
#define VACOPTVALUE_DISABLED VACOPT_TERNARY_DISABLED
#define VACOPTVALUE_ENABLED VACOPT_TERNARY_ENABLED
#define CopyFromState CopyState
#define BeginCopyFrom_compat(a, b, c, d, e, f, g, h) BeginCopyFrom(a, b, d, e, f, g, h)
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
standard_ProcessUtility(a, b, d, e, f, g, h)
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h)
#define COPY_FRONTEND COPY_NEW_FE
#define SetTuplestoreDestReceiverParams_compat(a, b, c, d, e, f) \
SetTuplestoreDestReceiverParams(a, b, c, d)
#define pgproc_statusflags_compat(pgproc) \
((&ProcGlobal->allPgXact[(pgproc)->pgprocno])->vacuumFlags)
#define get_partition_parent_compat(a, b) get_partition_parent(a)
#define RelationGetPartitionDesc_compat(a, b) RelationGetPartitionDesc(a)
#define PQ_LARGE_MESSAGE_LIMIT 0
#define make_simple_restrictinfo_compat(a, b) make_simple_restrictinfo(b)
#define pull_varnos_compat(a, b) pull_varnos(b)
#define pg_get_statisticsobj_worker_compat(a, b, c) pg_get_statisticsobj_worker(a, c)
#endif
#if PG_VERSION_NUM >= PG_VERSION_13
#define lnext_compat(l, r) lnext(l, r)
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c)
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b)
#define planner_compat(p, c, b) planner(p, NULL, c, b)
#define standard_planner_compat(a, c, d) standard_planner(a, NULL, c, d)
#define GetSequencesOwnedByRelation(a) getOwnedSequences(a)
#define GetSequencesOwnedByColumn(a, b) getOwnedSequences_internal(a, b, 0)
#define CMDTAG_SELECT_COMPAT CMDTAG_SELECT
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) \
ExplainOnePlan(a, b, c, d, e, f, g, h)
#define SetListCellPtr(a, b) ((a)->ptr_value = (b))
#define RangeTableEntryFromNSItem(a) ((a)->p_rte)
#define QueryCompletionCompat QueryCompletion
#else /* pre PG13 */
#define lnext_compat(l, r) lnext(r)
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p)
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b)
#define planner_compat(p, c, b) planner(p, c, b)
#define standard_planner_compat(a, c, d) standard_planner(a, c, d)
#define CMDTAG_SELECT_COMPAT "SELECT"
#define GetSequencesOwnedByRelation(a) getOwnedSequences(a, InvalidAttrNumber)
#define GetSequencesOwnedByColumn(a, b) getOwnedSequences(a, b)
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g)
#define SetListCellPtr(a, b) ((a)->data.ptr_value = (b))
#define RangeTableEntryFromNSItem(a) (a)
#define QueryCompletionCompat char
#define varattnosyn varoattno
#define varnosyn varnoold
#endif
#if PG_VERSION_NUM >= PG_VERSION_12
#define CreateTableSlotForRel(rel) table_slot_create(rel, NULL)
#define MakeSingleTupleTableSlotCompat MakeSingleTupleTableSlot
#define AllocSetContextCreateExtended AllocSetContextCreateInternal
#define NextCopyFromCompat NextCopyFrom
#define ArrayRef SubscriptingRef
#define T_ArrayRef T_SubscriptingRef
#define or_clause is_orclause
#define GetSysCacheOid1Compat GetSysCacheOid1
#define GetSysCacheOid2Compat GetSysCacheOid2
#define GetSysCacheOid3Compat GetSysCacheOid3
#define GetSysCacheOid4Compat GetSysCacheOid4
#define fcGetArgValue(fc, n) ((fc)->args[n].value)
#define fcGetArgNull(fc, n) ((fc)->args[n].isnull)
#define fcSetArgExt(fc, n, val, is_null) \
(((fc)->args[n].isnull = (is_null)), ((fc)->args[n].value = (val)))
typedef struct
{
File fd;
@ -183,7 +78,4 @@ FileCompatFromFileStart(File fileDesc)
#endif /* PG12 */
#define fcSetArg(fc, n, value) fcSetArgExt(fc, n, value, false)
#define fcSetArgNull(fc, n) fcSetArgExt(fc, n, (Datum) 0, true)
#endif /* VERSION_COMPAT_H */

View File

@ -0,0 +1,128 @@
/*-------------------------------------------------------------------------
*
* pg_version_compat.h
* Compatibility macros for writing code agnostic to PostgreSQL versions
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_VERSION_COMPAT_H
#define PG_VERSION_COMPAT_H
#include "distributed/pg_version_constants.h"
#if PG_VERSION_NUM >= PG_VERSION_14
#define AlterTableStmtObjType_compat(a) ((a)->objtype)
#define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a, b)
#define getObjectIdentity_compat(a, b) getObjectIdentity(a, b)
/* for MemoryContextMethods->stats */
#define stats_compat(a, b, c, d, e) stats(a, b, c, d, e)
#define FuncnameGetCandidates_compat(a, b, c, d, e, f, g) \
FuncnameGetCandidates(a, b, c, d, e, f, g)
#define expand_function_arguments_compat(a, b, c, d) expand_function_arguments(a, b, c, d)
#define BeginCopyFrom_compat(a, b, c, d, e, f, g, h) BeginCopyFrom(a, b, c, d, e, f, g, h)
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
standard_ProcessUtility(a, b, c, d, e, f, g, h)
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) \
ProcessUtility(a, b, c, d, e, f, g, h)
#define SetTuplestoreDestReceiverParams_compat(a, b, c, d, e, f) \
SetTuplestoreDestReceiverParams(a, b, c, d, e, f)
#define pgproc_statusflags_compat(pgproc) ((pgproc)->statusFlags)
#define get_partition_parent_compat(a, b) get_partition_parent(a, b)
#define RelationGetPartitionDesc_compat(a, b) RelationGetPartitionDesc(a, b)
#define make_simple_restrictinfo_compat(a, b) make_simple_restrictinfo(a, b)
#define pull_varnos_compat(a, b) pull_varnos(a, b)
#define pg_get_statisticsobj_worker_compat(a, b, c) pg_get_statisticsobj_worker(a, b, c)
#else
#define AlterTableStmtObjType_compat(a) ((a)->relkind)
#define F_NEXTVAL F_NEXTVAL_OID
#define ROLE_PG_MONITOR DEFAULT_ROLE_MONITOR
#define PROC_WAIT_STATUS_WAITING STATUS_WAITING
#define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a)
#define getObjectIdentity_compat(a, b) getObjectIdentity(a)
/* for MemoryContextMethods->stats */
#define stats_compat(a, b, c, d, e) stats(a, b, c, d)
#define FuncnameGetCandidates_compat(a, b, c, d, e, f, g) \
FuncnameGetCandidates(a, b, c, d, e, g)
#define expand_function_arguments_compat(a, b, c, d) expand_function_arguments(a, c, d)
#define VacOptValue VacOptTernaryValue
#define VACOPTVALUE_UNSPECIFIED VACOPT_TERNARY_DEFAULT
#define VACOPTVALUE_DISABLED VACOPT_TERNARY_DISABLED
#define VACOPTVALUE_ENABLED VACOPT_TERNARY_ENABLED
#define CopyFromState CopyState
#define BeginCopyFrom_compat(a, b, c, d, e, f, g, h) BeginCopyFrom(a, b, d, e, f, g, h)
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
standard_ProcessUtility(a, b, d, e, f, g, h)
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h)
#define COPY_FRONTEND COPY_NEW_FE
#define SetTuplestoreDestReceiverParams_compat(a, b, c, d, e, f) \
SetTuplestoreDestReceiverParams(a, b, c, d)
#define pgproc_statusflags_compat(pgproc) \
((&ProcGlobal->allPgXact[(pgproc)->pgprocno])->vacuumFlags)
#define get_partition_parent_compat(a, b) get_partition_parent(a)
#define RelationGetPartitionDesc_compat(a, b) RelationGetPartitionDesc(a)
#define PQ_LARGE_MESSAGE_LIMIT 0
#define make_simple_restrictinfo_compat(a, b) make_simple_restrictinfo(b)
#define pull_varnos_compat(a, b) pull_varnos(b)
#define pg_get_statisticsobj_worker_compat(a, b, c) pg_get_statisticsobj_worker(a, c)
#endif
#if PG_VERSION_NUM >= PG_VERSION_13
#define lnext_compat(l, r) lnext(l, r)
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c)
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b)
#define planner_compat(p, c, b) planner(p, NULL, c, b)
#define standard_planner_compat(a, c, d) standard_planner(a, NULL, c, d)
#define GetSequencesOwnedByRelation(a) getOwnedSequences(a)
#define GetSequencesOwnedByColumn(a, b) getOwnedSequences_internal(a, b, 0)
#define CMDTAG_SELECT_COMPAT CMDTAG_SELECT
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) \
ExplainOnePlan(a, b, c, d, e, f, g, h)
#define SetListCellPtr(a, b) ((a)->ptr_value = (b))
#define RangeTableEntryFromNSItem(a) ((a)->p_rte)
#define QueryCompletionCompat QueryCompletion
#else /* pre PG13 */
#define lnext_compat(l, r) lnext(r)
#define list_delete_cell_compat(l, c, p) list_delete_cell(l, c, p)
#define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, c, b)
#define planner_compat(p, c, b) planner(p, c, b)
#define standard_planner_compat(a, c, d) standard_planner(a, c, d)
#define CMDTAG_SELECT_COMPAT "SELECT"
#define GetSequencesOwnedByRelation(a) getOwnedSequences(a, InvalidAttrNumber)
#define GetSequencesOwnedByColumn(a, b) getOwnedSequences(a, b)
#define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g)
#define SetListCellPtr(a, b) ((a)->data.ptr_value = (b))
#define RangeTableEntryFromNSItem(a) (a)
#define QueryCompletionCompat char
#define varattnosyn varoattno
#define varnosyn varnoold
#endif
#if PG_VERSION_NUM >= PG_VERSION_12
#define CreateTableSlotForRel(rel) table_slot_create(rel, NULL)
#define MakeSingleTupleTableSlotCompat MakeSingleTupleTableSlot
#define AllocSetContextCreateExtended AllocSetContextCreateInternal
#define NextCopyFromCompat NextCopyFrom
#define ArrayRef SubscriptingRef
#define T_ArrayRef T_SubscriptingRef
#define or_clause is_orclause
#define GetSysCacheOid1Compat GetSysCacheOid1
#define GetSysCacheOid2Compat GetSysCacheOid2
#define GetSysCacheOid3Compat GetSysCacheOid3
#define GetSysCacheOid4Compat GetSysCacheOid4
#define fcGetArgValue(fc, n) ((fc)->args[n].value)
#define fcGetArgNull(fc, n) ((fc)->args[n].isnull)
#define fcSetArgExt(fc, n, val, is_null) \
(((fc)->args[n].isnull = (is_null)), ((fc)->args[n].value = (val)))
#endif /* PG12 */
#define fcSetArg(fc, n, value) fcSetArgExt(fc, n, value, false)
#define fcSetArgNull(fc, n) fcSetArgExt(fc, n, (Datum) 0, true)
#endif /* PG_VERSION_COMPAT_H */

View File

@ -225,9 +225,6 @@ CREATE FOREIGN TABLE foreign_table (
-- observe that we do not create fdw server for shell table, both shard relation
-- & shell relation points to the same same server object
SELECT citus_add_local_table_to_metadata('foreign_table');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: server "fake_fdw_server" already exists, skipping
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
citus_add_local_table_to_metadata
---------------------------------------------------------------------

View File

@ -725,4 +725,5 @@ $$);
(2 rows)
-- cleanup at exit
set client_min_messages to error;
DROP SCHEMA citus_local_tables_mx CASCADE;

View File

@ -0,0 +1,424 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1508000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
CREATE SCHEMA foreign_tables_schema_mx;
SET search_path TO foreign_tables_schema_mx;
-- test adding foreign table to metadata with the guc
SET citus.use_citus_managed_tables TO ON;
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
INSERT INTO foreign_table_test VALUES (1, 'text_test');
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server
OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table (
id integer NOT NULL,
data text,
a bigserial
)
SERVER foreign_server
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
--verify
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
n | s
(1 row)
CREATE TABLE parent_for_foreign_tables (
project_id integer
) PARTITION BY HASH (project_id);
CREATE SERVER IF NOT EXISTS srv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
CREATE SERVER IF NOT EXISTS srv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
CREATE SERVER IF NOT EXISTS srv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (table_name 'dummy');
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (table_name 'dummy');
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv3 OPTIONS (table_name 'dummy');
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
n | s
n | s
n | s
n | s
(4 rows)
ALTER FOREIGN TABLE foreign_table SET SCHEMA public;
ALTER FOREIGN TABLE public.foreign_table RENAME TO foreign_table_newname;
ALTER FOREIGN TABLE public.foreign_table_newname RENAME COLUMN id TO id_test;
ALTER FOREIGN TABLE public.foreign_table_newname ADD dummy_col bigint NOT NULL DEFAULT 1;
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col DROP DEFAULT;
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col SET DEFAULT 2;
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col TYPE int;
ALTER TABLE foreign_table_test RENAME COLUMN id TO id_test;
ALTER TABLE foreign_table_test ADD dummy_col int NOT NULL DEFAULT 1;
INSERT INTO public.foreign_table_newname VALUES (2, 'test_2');
INSERT INTO foreign_table_test VALUES (3, 'test_3');
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c check(id_test < 1000);
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint check_c;
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check(id_test < 1000) NOT VALID;
ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
-- trigger test
CREATE TABLE distributed_table(value int);
SELECT create_distributed_table('distributed_table', 'value');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
BEGIN
INSERT INTO distributed_table VALUES (42);
RETURN NEW;
END;
$insert_42$ LANGUAGE plpgsql;
CREATE TRIGGER insert_42_trigger
AFTER DELETE ON public.foreign_table_newname
FOR EACH ROW EXECUTE FUNCTION insert_42();
-- do the same pattern from the workers as well
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
select * from distributed_table ORDER BY value;
value
---------------------------------------------------------------------
42
(1 row)
-- disable trigger
alter foreign table public.foreign_table_newname disable trigger insert_42_trigger;
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
-- should not insert again as trigger disabled
select * from distributed_table ORDER BY value;
value
---------------------------------------------------------------------
42
(1 row)
DROP TRIGGER insert_42_trigger ON public.foreign_table_newname;
-- should throw errors
select alter_table_set_access_method('public.foreign_table_newname', 'columnar');
ERROR: cannot complete operation because it is a foreign table
select alter_distributed_table('public.foreign_table_newname', shard_count:=4);
ERROR: cannot alter table because the table is not distributed
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO pg_monitor;
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,pg_monitor)
(localhost,57638,t,pg_monitor)
(2 rows)
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO postgres;
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,postgres)
(localhost,57638,t,postgres)
(2 rows)
\c - - - :worker_1_port
SET search_path TO foreign_tables_schema_mx;
SELECT * FROM public.foreign_table_newname ORDER BY id_test;
id_test | data | a | dummy_col
---------------------------------------------------------------------
1 | text_test | 1 | 1
2 | test_2 | 1 | 2
3 | test_3 | 2 | 1
(3 rows)
SELECT * FROM foreign_table_test ORDER BY id_test;
id_test | data | a | dummy_col
---------------------------------------------------------------------
1 | text_test | 1 | 1
2 | test_2 | 1 | 2
3 | test_3 | 2 | 1
(3 rows)
-- should error out
ALTER FOREIGN TABLE public.foreign_table_newname DROP COLUMN id;
ERROR: operation is not allowed on this node
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
n | s
n | s
n | s
n | s
(4 rows)
\c - - - :master_port
ALTER FOREIGN TABLE foreign_table_newname RENAME TO foreign_table;
SET search_path TO foreign_tables_schema_mx;
ALTER FOREIGN TABLE public.foreign_table SET SCHEMA foreign_tables_schema_mx;
ALTER FOREIGN TABLE IF EXISTS foreign_table RENAME COLUMN id_test TO id;
ALTER TABLE foreign_table_test RENAME COLUMN id_test TO id;
ALTER FOREIGN TABLE foreign_table DROP COLUMN id;
ALTER FOREIGN TABLE foreign_table DROP COLUMN dummy_col;
ALTER TABLE foreign_table_test DROP COLUMN dummy_col;
ALTER FOREIGN TABLE foreign_table OPTIONS (DROP schema_name, SET table_name 'notable');
SELECT run_command_on_workers($$SELECT f.ftoptions FROM pg_foreign_table f JOIN pg_class c ON f.ftrelid=c.oid WHERE c.relname = 'foreign_table';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,{table_name=notable})
(localhost,57638,t,{table_name=notable})
(2 rows)
ALTER FOREIGN TABLE foreign_table OPTIONS (ADD schema_name 'foreign_tables_schema_mx', SET table_name 'foreign_table_test');
SELECT * FROM foreign_table ORDER BY a;
data | a
---------------------------------------------------------------------
text_test | 1
test_2 | 1
test_3 | 2
(3 rows)
-- test alter user mapping
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'nonexistiniguser');
-- should fail
SELECT * FROM foreign_table ORDER BY a;
ERROR: could not connect to server "foreign_server"
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgres');
-- test undistributing
DELETE FROM foreign_table;
SELECT undistribute_table('foreign_table');
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('foreign_table','data');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('foreign_table');
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('foreign_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('foreign_table');
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
INSERT INTO foreign_table_test VALUES (1, 'testt');
SELECT * FROM foreign_table ORDER BY a;
data | a
---------------------------------------------------------------------
testt | 3
(1 row)
SELECT * FROM foreign_table_test ORDER BY a;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
DROP TABLE parent_for_foreign_tables;
CREATE TABLE parent_for_foreign_tables (id integer NOT NULL, data text, a bigserial)
PARTITION BY HASH (id);
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
SELECT citus_add_local_table_to_metadata('parent_for_foreign_tables');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
n | s
n | s
n | s
n | s
(4 rows)
CREATE USER MAPPING FOR CURRENT_USER
SERVER srv1
OPTIONS (user 'postgres');
CREATE USER MAPPING FOR CURRENT_USER
SERVER srv2
OPTIONS (user 'postgres');
SELECT * FROM parent_for_foreign_tables ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
1 | testt | 3
1 | testt | 3
(3 rows)
SELECT * FROM foreign_partition_1 ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
SELECT * FROM foreign_partition_2 ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
SELECT * FROM foreign_partition_3 ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
\c - - - :worker_1_port
SET search_path TO foreign_tables_schema_mx;
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
n | s
n | s
n | s
n | s
(4 rows)
SELECT * FROM parent_for_foreign_tables ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
1 | testt | 3
1 | testt | 3
(3 rows)
SELECT * FROM foreign_partition_1 ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
SELECT * FROM foreign_partition_2 ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
SELECT * FROM foreign_partition_3 ORDER BY id;
id | data | a
---------------------------------------------------------------------
1 | testt | 3
(1 row)
\c - - - :master_port
SET search_path TO foreign_tables_schema_mx;
--verify
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
partmethod | repmodel
---------------------------------------------------------------------
(0 rows)
CREATE SERVER foreign_server_local
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server_local
OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table_local (
id integer NOT NULL,
data text
)
SERVER foreign_server_local
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
CREATE TABLE dist_tbl(a int);
INSERT INTO dist_tbl VALUES (1);
SELECT create_distributed_table('dist_tbl','a');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
a | id | data
---------------------------------------------------------------------
1 | 1 | testt
(1 row)
CREATE TABLE ref_tbl(a int);
INSERT INTO ref_tbl VALUES (1);
SELECT create_reference_table('ref_tbl');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
a | id | data
---------------------------------------------------------------------
1 | 1 | testt
(1 row)
SELECT citus_add_local_table_to_metadata('foreign_table_local');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO foreign_tables_schema_mx;
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
a | id | data
---------------------------------------------------------------------
1 | 1 | testt
(1 row)
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
a | id | data
---------------------------------------------------------------------
1 | 1 | testt
(1 row)
\c - - - :master_port
SET search_path TO foreign_tables_schema_mx;
-- should error out because doesn't have a table_name field
CREATE FOREIGN TABLE foreign_table_local_fails (
id integer NOT NULL,
data text
)
SERVER foreign_server_local
OPTIONS (schema_name 'foreign_tables_schema_mx');
ERROR: table_name option must be provided when using postgres_fdw with Citus
DROP FOREIGN TABLE foreign_table_local;
-- cleanup at exit
set client_min_messages to error;
DROP SCHEMA foreign_tables_schema_mx CASCADE;

View File

@ -64,8 +64,6 @@ CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server;
SELECT create_distributed_table('foreign_distributed_table', 'a');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------

View File

@ -428,8 +428,6 @@ SELECT create_distributed_table('table_range', 'id', 'range');
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------

View File

@ -159,8 +159,6 @@ SERVER fake_fdw_server;
SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------

View File

@ -1005,7 +1005,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_disable_node(text,integer,boolean) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void
| function citus_run_local_command(text) void
(9 rows)
| function worker_drop_sequence_dependency(text) void
(10 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -172,8 +172,6 @@ CREATE FOREIGN TABLE foreign_table (
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------
@ -200,13 +198,11 @@ order by table_name;
\c - - :master_host :master_port
SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
master_get_table_ddl_events
---------------------------------------------------------------------
CREATE SERVER IF NOT EXISTS fake_fdw_server FOREIGN DATA WRAPPER fake_fdw
CREATE FOREIGN TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 (id bigint NOT NULL, rename_name character(8) DEFAULT ''::text NOT NULL) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true')
ALTER TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 OWNER TO postgres
(3 rows)
(2 rows)
-- propagating views is not supported
CREATE VIEW local_view AS SELECT * FROM simple_table;

View File

@ -41,14 +41,15 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
unnest
---------------------------------------------------------------------
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE)
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
(5 rows)
(6 rows)
-- this function is dropped in Citus10, added here for tests
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
@ -97,6 +98,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE)
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -107,7 +109,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(18 rows)
(19 rows)
-- Show that CREATE INDEX commands are included in the metadata snapshot
CREATE INDEX mx_index ON mx_test_table(col_2);
@ -123,6 +125,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE)
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -133,7 +136,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(19 rows)
(20 rows)
-- Show that schema changes are included in the metadata snapshot
CREATE SCHEMA mx_testing_schema;
@ -150,6 +153,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE)
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -160,7 +164,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(19 rows)
(20 rows)
-- Show that append distributed tables are not included in the metadata snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -183,6 +187,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE)
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -193,7 +198,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(19 rows)
(20 rows)
-- Show that range distributed tables are not included in the metadata snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -209,6 +214,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default', TRUE)
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -219,7 +225,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(19 rows)
(20 rows)
-- Test start_metadata_sync_to_node UDF
-- Ensure that hasmetadata=false for all nodes
@ -300,7 +306,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
(4 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | s | f
(1 row)
@ -437,7 +443,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
(4 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | s | f
(1 row)
@ -1692,7 +1698,7 @@ ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('mx_test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1');
SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
unnest
---------------------------------------------------------------------
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.mx_test_sequence_0 OWNER TO postgres
@ -1731,6 +1737,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10005, 's')
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10003, 't')
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10005, 's')
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_sequence_0 AS integer INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1 NO CYCLE','integer')
@ -1758,7 +1765,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(64 rows)
(65 rows)
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;

View File

@ -103,8 +103,6 @@ CREATE FOREIGN TABLE remote_engagements (
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------

View File

@ -70,6 +70,12 @@ CREATE OR REPLACE FUNCTION square(a INT) RETURNS INT AS $$
BEGIN
RETURN a*a;
END; $$ LANGUAGE PLPGSQL STABLE;
SELECT create_distributed_function('square(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table(a int, b int DEFAULT square(10));
SELECT citus_add_local_table_to_metadata('citus_local_table');
citus_add_local_table_to_metadata

View File

@ -15,6 +15,26 @@ SET citus.enable_ddl_propagation TO ON;
CREATE SERVER foreign_server_dependent_schema
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
CREATE FOREIGN TABLE foreign_table (
id integer NOT NULL,
data text
)
SERVER foreign_server_dependent_schema
OPTIONS (schema_name 'test_dependent_schema', table_name 'foreign_table_test');
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId=>0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_local_table_to_metadata('foreign_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
ALTER TABLE foreign_table OWNER TO pg_monitor;
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
@ -38,6 +58,14 @@ SELECT run_command_on_workers(
(localhost,57638,t,t)
(2 rows)
-- verify the owner is altered on workers
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,pg_monitor)
(localhost,57638,t,pg_monitor)
(2 rows)
CREATE SERVER foreign_server_to_drop
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
@ -45,6 +73,13 @@ CREATE SERVER foreign_server_to_drop
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
ERROR: cannot drop distributed server with other servers
HINT: Try dropping each object in a separate DROP command
DROP FOREIGN TABLE foreign_table;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO ERROR;
DROP SCHEMA test_dependent_schema CASCADE;
RESET client_min_messages;

View File

@ -0,0 +1,356 @@
CREATE SCHEMA resync_metadata_with_sequences;
SET search_path TO resync_metadata_with_sequences;
CREATE TABLE test_serial(a bigserial PRIMARY KEY);
SELECT create_distributed_table('test_serial', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SEQUENCE myseq;
CREATE TABLE test_sequence(a bigint DEFAULT nextval('myseq'));
SELECT create_distributed_table('test_sequence', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SEQUENCE myseq_ref;
CREATE TABLE test_serial_ref(a bigserial PRIMARY KEY, b bigint DEFAULT nextval('myseq_ref'));
SELECT create_reference_table('test_serial_ref');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
1 | 1
(1 row)
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
2 | 2
(1 row)
SET client_min_messages TO ERROR;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid:=0);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE SEQUENCE myseq_locl_to_dist;
CREATE TABLE test_local_to_dist(a bigserial PRIMARY KEY, b bigint DEFAULT nextval('myseq_locl_to_dist'));
SELECT citus_add_local_table_to_metadata('test_local_to_dist');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
1 | 1
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
2 | 2
(1 row)
SET citus.shard_replication_factor TO 1;
CREATE SEQUENCE other_id_seq;
CREATE TABLE sensors(
measureid bigserial,
other_id bigint DEFAULT nextval('other_id_seq'),
eventdatetime date) PARTITION BY RANGE(eventdatetime);
CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01');
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949185
(1 row)
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949186
(1 row)
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949185
(1 row)
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949186
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949185 | 3940649673949185
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949186 | 3940649673949186
(1 row)
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949185 | 3940649673949185
(1 row)
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949186 | 3940649673949186
(1 row)
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
3940649673949185 | 3940649673949185 | 01-01-2010
(1 row)
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
3940649673949186 | 3940649673949186 | 01-01-2021
(1 row)
\c - - - :master_port
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
-- can continue inserting with the existing sequence/serial
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949187
(1 row)
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949188
(1 row)
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949187
(1 row)
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
a
---------------------------------------------------------------------
3940649673949188
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949187 | 3940649673949187
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949188 | 3940649673949188
(1 row)
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949187 | 3940649673949187
(1 row)
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949188 | 3940649673949188
(1 row)
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
3940649673949187 | 3940649673949187 | 01-01-2010
(1 row)
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
3940649673949188 | 3940649673949188 | 01-01-2021
(1 row)
\c - - - :master_port
SET search_path tO resync_metadata_with_sequences;
SELECT create_distributed_table('test_local_to_dist', 'a', colocate_with:='none');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$resync_metadata_with_sequences.test_local_to_dist$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3 | 3
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
4 | 4
(1 row)
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949189 | 3940649673949189
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949190 | 3940649673949190
(1 row)
\c - - - :master_port
SET search_path tO resync_metadata_with_sequences;
SELECT alter_distributed_table('test_local_to_dist', shard_count:=6);
NOTICE: creating a new table for resync_metadata_with_sequences.test_local_to_dist
NOTICE: moving the data of resync_metadata_with_sequences.test_local_to_dist
NOTICE: dropping the old resync_metadata_with_sequences.test_local_to_dist
NOTICE: renaming the new table to resync_metadata_with_sequences.test_local_to_dist
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SET citus.shard_replication_factor TO 1;
SELECT alter_distributed_table('sensors', shard_count:=5);
NOTICE: converting the partitions of resync_metadata_with_sequences.sensors
NOTICE: creating a new table for resync_metadata_with_sequences.sensors_old
NOTICE: moving the data of resync_metadata_with_sequences.sensors_old
NOTICE: dropping the old resync_metadata_with_sequences.sensors_old
NOTICE: renaming the new table to resync_metadata_with_sequences.sensors_old
NOTICE: creating a new table for resync_metadata_with_sequences.sensors_2020_01_01
NOTICE: moving the data of resync_metadata_with_sequences.sensors_2020_01_01
NOTICE: dropping the old resync_metadata_with_sequences.sensors_2020_01_01
NOTICE: renaming the new table to resync_metadata_with_sequences.sensors_2020_01_01
NOTICE: creating a new table for resync_metadata_with_sequences.sensors_news
NOTICE: moving the data of resync_metadata_with_sequences.sensors_news
NOTICE: dropping the old resync_metadata_with_sequences.sensors_news
NOTICE: renaming the new table to resync_metadata_with_sequences.sensors_news
NOTICE: creating a new table for resync_metadata_with_sequences.sensors
NOTICE: dropping the old resync_metadata_with_sequences.sensors
NOTICE: renaming the new table to resync_metadata_with_sequences.sensors
alter_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
5 | 5
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
6 | 6
(1 row)
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
1 | 1 | 01-01-2010
(1 row)
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
2 | 2 | 01-01-2021
(1 row)
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949191 | 3940649673949191
(1 row)
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
a | b
---------------------------------------------------------------------
3940649673949192 | 3940649673949192
(1 row)
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
3940649673949189 | 3940649673949189 | 01-01-2010
(1 row)
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
measureid | other_id | eventdatetime
---------------------------------------------------------------------
3940649673949190 | 3940649673949190 | 01-01-2021
(1 row)
\c - - - :master_port
SET search_path tO resync_metadata_with_sequences;
DROP TABLE test_serial, test_sequence;
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
-- show that we only have the sequences left after
-- dropping the tables (e.g., bigserial is dropped)
select count(*) from pg_sequences where schemaname ilike '%resync_metadata_with_sequences%';
count
---------------------------------------------------------------------
7
(1 row)
\c - - - :master_port
SET client_min_messages TO ERROR;
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
DROP SCHEMA resync_metadata_with_sequences CASCADE;

View File

@ -136,13 +136,26 @@ SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, met
0 | localhost | 57636 | t | t | t | t
(1 row)
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
BEGIN;
-- we should not enable MX for this temporary node just because
-- it'd spawn a bg worker targeting this node
-- and that changes the connection count specific tests
-- here
SET LOCAL citus.enable_metadata_sync_by_default TO OFF;
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
ERROR: cannot add a worker node when the coordinator hostname is set to localhost
DETAIL: Worker nodes need to be able to connect to the coordinator to transfer data.
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
-- adding localhost workers is ok
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
COMMIT;
BEGIN;
-- we should not enable MX for this temporary node just because
-- it'd spawn a bg worker targeting this node
-- and that changes the connection count specific tests
-- here
SET LOCAL citus.enable_metadata_sync_by_default TO OFF;
-- adding localhost workers is ok
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
NOTICE: shards are still on the coordinator after adding the new node
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('localhost',57636); to permanently move shards away from the coordinator.
?column?
@ -150,6 +163,8 @@ HINT: Use SELECT rebalance_table_shards(); to balance shards data between worke
1
(1 row)
COMMIT;
-- we don't need this node anymore
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
@ -163,8 +178,14 @@ SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
1
(1 row)
-- adding workers with specific IP is ok now
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
BEGIN;
-- we should not enable MX for this temporary node just because
-- it'd spawn a bg worker targeting this node
-- and that changes the connection count specific tests
-- here
SET LOCAL citus.enable_metadata_sync_by_default TO OFF;
-- adding workers with specific IP is ok now
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
NOTICE: shards are still on the coordinator after adding the new node
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('127.0.0.1',57636); to permanently move shards away from the coordinator.
?column?
@ -172,6 +193,8 @@ HINT: Use SELECT rebalance_table_shards(); to balance shards data between worke
1
(1 row)
COMMIT;
-- we don't need this node anymore
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
?column?
---------------------------------------------------------------------

View File

@ -156,7 +156,7 @@ SELECT * FROM test_matview;
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f

View File

@ -145,15 +145,20 @@ CREATE FOREIGN TABLE foreign_table (
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('foreign_table');
ERROR: cannot complete operation because it is a foreign table
NOTICE: creating a new table for undistribute_table.foreign_table
NOTICE: dropping the old undistribute_table.foreign_table
NOTICE: renaming the new table to undistribute_table.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
DROP FOREIGN TABLE foreign_table;
SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
start_metadata_sync_to_node

View File

@ -204,6 +204,7 @@ ORDER BY 1;
function worker_create_schema(bigint,text)
function worker_create_truncate_trigger(regclass)
function worker_drop_distributed_table(text)
function worker_drop_sequence_dependency(text)
function worker_fetch_foreign_file(text,text,bigint,text[],integer[])
function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer)
function worker_fix_partition_shard_index_names(regclass,text,text)
@ -263,5 +264,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(247 rows)
(248 rows)

View File

@ -16,10 +16,8 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: turn_mx_off
test: single_node
test: single_node_truncate
test: turn_mx_on
test: multi_test_helpers multi_test_helpers_superuser
test: multi_cluster_management
@ -280,8 +278,9 @@ test: foreign_key_to_reference_table
test: replicate_reference_tables_to_coordinator
test: turn_mx_off
test: citus_local_tables
test: multi_row_router_insert mixed_relkind_tests
test: mixed_relkind_tests
test: turn_mx_on
test: multi_row_router_insert
test: multi_reference_table citus_local_tables_queries
test: citus_local_table_triggers
test: coordinator_shouldhaveshards

View File

@ -49,6 +49,7 @@ test: local_shard_copy
test: undistribute_table_cascade_mx
test: citus_local_tables_mx
test: citus_local_tables_queries_mx
test: foreign_tables_mx
test: multi_mx_transaction_recovery
test: multi_mx_modifying_xacts
test: multi_mx_explain
@ -57,6 +58,7 @@ test: multi_mx_insert_select_repartition
test: locally_execute_intermediate_results
test: multi_mx_alter_distributed_table
test: update_colocation_mx
test: resync_metadata_with_sequences
# should be executed sequentially because it modifies metadata
test: local_shard_execution_dropped_column

View File

@ -381,5 +381,7 @@ SELECT run_command_on_workers(
$$
SELECT count(*) FROM pg_catalog.pg_tables WHERE tablename='citus_local_table_4'
$$);
-- cleanup at exit
set client_min_messages to error;
DROP SCHEMA citus_local_tables_mx CASCADE;

View File

@ -0,0 +1,236 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1508000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
CREATE SCHEMA foreign_tables_schema_mx;
SET search_path TO foreign_tables_schema_mx;
-- test adding foreign table to metadata with the guc
SET citus.use_citus_managed_tables TO ON;
CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial);
INSERT INTO foreign_table_test VALUES (1, 'text_test');
CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server
OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table (
id integer NOT NULL,
data text,
a bigserial
)
SERVER foreign_server
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
--verify
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
CREATE TABLE parent_for_foreign_tables (
project_id integer
) PARTITION BY HASH (project_id);
CREATE SERVER IF NOT EXISTS srv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
CREATE SERVER IF NOT EXISTS srv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
CREATE SERVER IF NOT EXISTS srv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'regression', host 'localhost', port :'master_port');
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (table_name 'dummy');
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (table_name 'dummy');
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv3 OPTIONS (table_name 'dummy');
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
ALTER FOREIGN TABLE foreign_table SET SCHEMA public;
ALTER FOREIGN TABLE public.foreign_table RENAME TO foreign_table_newname;
ALTER FOREIGN TABLE public.foreign_table_newname RENAME COLUMN id TO id_test;
ALTER FOREIGN TABLE public.foreign_table_newname ADD dummy_col bigint NOT NULL DEFAULT 1;
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col DROP DEFAULT;
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col SET DEFAULT 2;
ALTER FOREIGN TABLE public.foreign_table_newname ALTER dummy_col TYPE int;
ALTER TABLE foreign_table_test RENAME COLUMN id TO id_test;
ALTER TABLE foreign_table_test ADD dummy_col int NOT NULL DEFAULT 1;
INSERT INTO public.foreign_table_newname VALUES (2, 'test_2');
INSERT INTO foreign_table_test VALUES (3, 'test_3');
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c check(id_test < 1000);
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint check_c;
ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check(id_test < 1000) NOT VALID;
ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
-- trigger test
CREATE TABLE distributed_table(value int);
SELECT create_distributed_table('distributed_table', 'value');
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
BEGIN
INSERT INTO distributed_table VALUES (42);
RETURN NEW;
END;
$insert_42$ LANGUAGE plpgsql;
CREATE TRIGGER insert_42_trigger
AFTER DELETE ON public.foreign_table_newname
FOR EACH ROW EXECUTE FUNCTION insert_42();
-- do the same pattern from the workers as well
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
select * from distributed_table ORDER BY value;
-- disable trigger
alter foreign table public.foreign_table_newname disable trigger insert_42_trigger;
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
-- should not insert again as trigger disabled
select * from distributed_table ORDER BY value;
DROP TRIGGER insert_42_trigger ON public.foreign_table_newname;
-- should throw errors
select alter_table_set_access_method('public.foreign_table_newname', 'columnar');
select alter_distributed_table('public.foreign_table_newname', shard_count:=4);
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO pg_monitor;
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
ALTER FOREIGN TABLE public.foreign_table_newname OWNER TO postgres;
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table_newname';$$);
\c - - - :worker_1_port
SET search_path TO foreign_tables_schema_mx;
SELECT * FROM public.foreign_table_newname ORDER BY id_test;
SELECT * FROM foreign_table_test ORDER BY id_test;
-- should error out
ALTER FOREIGN TABLE public.foreign_table_newname DROP COLUMN id;
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
\c - - - :master_port
ALTER FOREIGN TABLE foreign_table_newname RENAME TO foreign_table;
SET search_path TO foreign_tables_schema_mx;
ALTER FOREIGN TABLE public.foreign_table SET SCHEMA foreign_tables_schema_mx;
ALTER FOREIGN TABLE IF EXISTS foreign_table RENAME COLUMN id_test TO id;
ALTER TABLE foreign_table_test RENAME COLUMN id_test TO id;
ALTER FOREIGN TABLE foreign_table DROP COLUMN id;
ALTER FOREIGN TABLE foreign_table DROP COLUMN dummy_col;
ALTER TABLE foreign_table_test DROP COLUMN dummy_col;
ALTER FOREIGN TABLE foreign_table OPTIONS (DROP schema_name, SET table_name 'notable');
SELECT run_command_on_workers($$SELECT f.ftoptions FROM pg_foreign_table f JOIN pg_class c ON f.ftrelid=c.oid WHERE c.relname = 'foreign_table';$$);
ALTER FOREIGN TABLE foreign_table OPTIONS (ADD schema_name 'foreign_tables_schema_mx', SET table_name 'foreign_table_test');
SELECT * FROM foreign_table ORDER BY a;
-- test alter user mapping
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'nonexistiniguser');
-- should fail
SELECT * FROM foreign_table ORDER BY a;
ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgres');
-- test undistributing
DELETE FROM foreign_table;
SELECT undistribute_table('foreign_table');
SELECT create_distributed_table('foreign_table','data');
SELECT undistribute_table('foreign_table');
SELECT create_reference_table('foreign_table');
SELECT undistribute_table('foreign_table');
INSERT INTO foreign_table_test VALUES (1, 'testt');
SELECT * FROM foreign_table ORDER BY a;
SELECT * FROM foreign_table_test ORDER BY a;
DROP TABLE parent_for_foreign_tables;
CREATE TABLE parent_for_foreign_tables (id integer NOT NULL, data text, a bigserial)
PARTITION BY HASH (id);
CREATE FOREIGN TABLE foreign_partition_1 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 0) SERVER srv1 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
CREATE FOREIGN TABLE foreign_partition_2 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 1) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
SELECT citus_add_local_table_to_metadata('parent_for_foreign_tables');
CREATE FOREIGN TABLE foreign_partition_3 PARTITION OF parent_for_foreign_tables FOR VALUES WITH (modulus 3, remainder 2) SERVER srv2 OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
CREATE USER MAPPING FOR CURRENT_USER
SERVER srv1
OPTIONS (user 'postgres');
CREATE USER MAPPING FOR CURRENT_USER
SERVER srv2
OPTIONS (user 'postgres');
SELECT * FROM parent_for_foreign_tables ORDER BY id;
SELECT * FROM foreign_partition_1 ORDER BY id;
SELECT * FROM foreign_partition_2 ORDER BY id;
SELECT * FROM foreign_partition_3 ORDER BY id;
\c - - - :worker_1_port
SET search_path TO foreign_tables_schema_mx;
SELECT partmethod, repmodel FROM pg_dist_partition
WHERE logicalrelid IN ('parent_for_foreign_tables'::regclass, 'foreign_partition_1'::regclass, 'foreign_partition_2'::regclass, 'foreign_partition_3'::regclass)
ORDER BY logicalrelid;
SELECT * FROM parent_for_foreign_tables ORDER BY id;
SELECT * FROM foreign_partition_1 ORDER BY id;
SELECT * FROM foreign_partition_2 ORDER BY id;
SELECT * FROM foreign_partition_3 ORDER BY id;
\c - - - :master_port
SET search_path TO foreign_tables_schema_mx;
--verify
SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid;
CREATE SERVER foreign_server_local
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port :'master_port', dbname 'regression');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server_local
OPTIONS (user 'postgres');
CREATE FOREIGN TABLE foreign_table_local (
id integer NOT NULL,
data text
)
SERVER foreign_server_local
OPTIONS (schema_name 'foreign_tables_schema_mx', table_name 'foreign_table_test');
CREATE TABLE dist_tbl(a int);
INSERT INTO dist_tbl VALUES (1);
SELECT create_distributed_table('dist_tbl','a');
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
CREATE TABLE ref_tbl(a int);
INSERT INTO ref_tbl VALUES (1);
SELECT create_reference_table('ref_tbl');
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
SELECT citus_add_local_table_to_metadata('foreign_table_local');
\c - - - :worker_1_port
SET search_path TO foreign_tables_schema_mx;
SELECT * FROM dist_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
SELECT * FROM ref_tbl d JOIN foreign_table_local f ON d.a=f.id ORDER BY f.id;
\c - - - :master_port
SET search_path TO foreign_tables_schema_mx;
-- should error out because doesn't have a table_name field
CREATE FOREIGN TABLE foreign_table_local_fails (
id integer NOT NULL,
data text
)
SERVER foreign_server_local
OPTIONS (schema_name 'foreign_tables_schema_mx');
DROP FOREIGN TABLE foreign_table_local;
-- cleanup at exit
set client_min_messages to error;
DROP SCHEMA foreign_tables_schema_mx CASCADE;

View File

@ -37,7 +37,7 @@ CREATE OR REPLACE FUNCTION square(a INT) RETURNS INT AS $$
BEGIN
RETURN a*a;
END; $$ LANGUAGE PLPGSQL STABLE;
SELECT create_distributed_function('square(int)');
CREATE TABLE citus_local_table(a int, b int DEFAULT square(10));
SELECT citus_add_local_table_to_metadata('citus_local_table');

View File

@ -12,6 +12,16 @@ SET citus.enable_ddl_propagation TO ON;
CREATE SERVER foreign_server_dependent_schema
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
CREATE FOREIGN TABLE foreign_table (
id integer NOT NULL,
data text
)
SERVER foreign_server_dependent_schema
OPTIONS (schema_name 'test_dependent_schema', table_name 'foreign_table_test');
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId=>0);
SELECT citus_add_local_table_to_metadata('foreign_table');
ALTER TABLE foreign_table OWNER TO pg_monitor;
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
@ -21,12 +31,17 @@ SELECT run_command_on_workers(
SELECT run_command_on_workers(
$$SELECT COUNT(*)=1 FROM pg_foreign_server WHERE srvname = 'foreign_server_dependent_schema';$$);
-- verify the owner is altered on workers
SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c on r.oid=c.relowner where relname = 'foreign_table';$$);
CREATE SERVER foreign_server_to_drop
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test');
--should error
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
DROP FOREIGN TABLE foreign_table;
SELECT citus_remove_node('localhost', :master_port);
SET client_min_messages TO ERROR;
DROP SCHEMA test_dependent_schema CASCADE;

View File

@ -0,0 +1,132 @@
CREATE SCHEMA resync_metadata_with_sequences;
SET search_path TO resync_metadata_with_sequences;
CREATE TABLE test_serial(a bigserial PRIMARY KEY);
SELECT create_distributed_table('test_serial', 'a');
CREATE SEQUENCE myseq;
CREATE TABLE test_sequence(a bigint DEFAULT nextval('myseq'));
SELECT create_distributed_table('test_sequence', 'a');
CREATE SEQUENCE myseq_ref;
CREATE TABLE test_serial_ref(a bigserial PRIMARY KEY, b bigint DEFAULT nextval('myseq_ref'));
SELECT create_reference_table('test_serial_ref');
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
SET client_min_messages TO ERROR;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid:=0);
CREATE SEQUENCE myseq_locl_to_dist;
CREATE TABLE test_local_to_dist(a bigserial PRIMARY KEY, b bigint DEFAULT nextval('myseq_locl_to_dist'));
SELECT citus_add_local_table_to_metadata('test_local_to_dist');
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
SET citus.shard_replication_factor TO 1;
CREATE SEQUENCE other_id_seq;
CREATE TABLE sensors(
measureid bigserial,
other_id bigint DEFAULT nextval('other_id_seq'),
eventdatetime date) PARTITION BY RANGE(eventdatetime);
CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01');
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
\c - - - :master_port
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
-- can continue inserting with the existing sequence/serial
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial VALUES(DEFAULT) RETURNING *;
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
INSERT into test_sequence VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
INSERT INTO test_serial_ref VALUES(DEFAULT) RETURNING *;
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
\c - - - :master_port
SET search_path tO resync_metadata_with_sequences;
SELECT create_distributed_table('test_local_to_dist', 'a', colocate_with:='none');
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
\c - - - :master_port
SET search_path tO resync_metadata_with_sequences;
SELECT alter_distributed_table('test_local_to_dist', shard_count:=6);
SET citus.shard_replication_factor TO 1;
SELECT alter_distributed_table('sensors', shard_count:=5);
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO test_local_to_dist VALUES(DEFAULT) RETURNING *;
INSERT INTO sensors VALUES (DEFAULT, DEFAULT, '2010-01-01') RETURNING *;
INSERT INTO sensors_news VALUES (DEFAULT, DEFAULT, '2021-01-01') RETURNING *;
\c - - - :master_port
SET search_path tO resync_metadata_with_sequences;
DROP TABLE test_serial, test_sequence;
\c - - - :worker_1_port
SET search_path tO resync_metadata_with_sequences;
-- show that we only have the sequences left after
-- dropping the tables (e.g., bigserial is dropped)
select count(*) from pg_sequences where schemaname ilike '%resync_metadata_with_sequences%';
\c - - - :master_port
SET client_min_messages TO ERROR;
SELECT 1 FROM citus_remove_node('localhost', :master_port);
DROP SCHEMA resync_metadata_with_sequences CASCADE;

View File

@ -67,18 +67,43 @@ SELECT create_distributed_table('test','x');
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
BEGIN;
-- we should not enable MX for this temporary node just because
-- it'd spawn a bg worker targeting this node
-- and that changes the connection count specific tests
-- here
SET LOCAL citus.enable_metadata_sync_by_default TO OFF;
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
COMMIT;
-- adding localhost workers is ok
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
BEGIN;
-- we should not enable MX for this temporary node just because
-- it'd spawn a bg worker targeting this node
-- and that changes the connection count specific tests
-- here
SET LOCAL citus.enable_metadata_sync_by_default TO OFF;
-- adding localhost workers is ok
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
COMMIT;
-- we don't need this node anymore
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
-- set the coordinator host to something different than localhost
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
-- adding workers with specific IP is ok now
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
BEGIN;
-- we should not enable MX for this temporary node just because
-- it'd spawn a bg worker targeting this node
-- and that changes the connection count specific tests
-- here
SET LOCAL citus.enable_metadata_sync_by_default TO OFF;
-- adding workers with specific IP is ok now
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
COMMIT;
-- we don't need this node anymore
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
-- set the coordinator host back to localhost for the remainder of tests