mirror of https://github.com/citusdata/citus.git
Merge pull request #4479 from citusdata/alter-table-add-fkey
Convert postgres tables to citus local tables for ALTER TABLE commands defining foreign keyspull/4513/head
commit
4b9285353d
|
@ -369,7 +369,19 @@ ExecuteCascadeOperationForRelationIdList(List *relationIdList,
|
|||
|
||||
case CASCADE_FKEY_CREATE_CITUS_LOCAL_TABLE:
|
||||
{
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
/*
|
||||
* Normally, we wouldn't expect a postgres table connected
|
||||
* to a citus local table via a foreign keys graph. But now
|
||||
* this is possible as we allow foreign keys from postgres
|
||||
* tables to reference tables when coordinator is not added
|
||||
* to metadata. So instead of erroring out, we skip citus
|
||||
* tables here.
|
||||
*/
|
||||
CreateCitusLocalTable(relationId, cascadeViaForeignKeys);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
|
||||
/* placeholder for PreprocessClusterStmt */
|
||||
List *
|
||||
PreprocessClusterStmt(Node *node, const char *clusterCommand)
|
||||
PreprocessClusterStmt(Node *node, const char *clusterCommand,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
ClusterStmt *clusterStmt = castNode(ClusterStmt, node);
|
||||
bool showPropagationWarning = false;
|
||||
|
|
|
@ -220,7 +220,8 @@ FilterNameListForDistributedCollations(List *objects, bool missing_ok,
|
|||
|
||||
|
||||
List *
|
||||
PreprocessDropCollationStmt(Node *node, const char *queryString)
|
||||
PreprocessDropCollationStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *stmt = castNode(DropStmt, node);
|
||||
|
||||
|
@ -290,7 +291,8 @@ PreprocessDropCollationStmt(Node *node, const char *queryString)
|
|||
* the workers to keep the type in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_COLLATION);
|
||||
|
@ -324,7 +326,8 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString)
|
|||
* executed on all the workers to keep the collation in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessRenameCollationStmt(Node *node, const char *queryString)
|
||||
PreprocessRenameCollationStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||
ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
||||
|
@ -357,7 +360,8 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString)
|
|||
* In this stage we can prepare the commands that need to be run on all workers.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_COLLATION);
|
||||
|
|
|
@ -239,7 +239,8 @@ AddSchemaFieldIfMissing(CreateExtensionStmt *createExtensionStmt)
|
|||
* be made to the workers.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropExtensionStmt(Node *node, const char *queryString)
|
||||
PreprocessDropExtensionStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *stmt = castNode(DropStmt, node);
|
||||
|
||||
|
@ -386,7 +387,8 @@ ExtensionNameListToObjectAddressList(List *extensionObjectList)
|
|||
* PreprocessAlterExtensionSchemaStmt is invoked for alter extension set schema statements.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagateExtensionCommand(node))
|
||||
{
|
||||
|
@ -451,7 +453,8 @@ PostprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
|
|||
* PreprocessAlterExtensionUpdateStmt is invoked for alter extension update statements.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterExtensionStmt *alterExtensionStmt = castNode(AlterExtensionStmt, node);
|
||||
|
||||
|
@ -599,7 +602,8 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
|||
* PreprocessAlterExtensionContentsStmt issues a notice. It does not propagate.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
ereport(NOTICE, (errmsg(
|
||||
"Citus does not propagate adding/dropping member objects"),
|
||||
|
|
|
@ -1244,7 +1244,8 @@ ShouldPropagateAlterFunction(const ObjectAddress *address)
|
|||
* can propagate the function in sequential mode.
|
||||
*/
|
||||
List *
|
||||
PreprocessCreateFunctionStmt(Node *node, const char *queryString)
|
||||
PreprocessCreateFunctionStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
CreateFunctionStmt *stmt = castNode(CreateFunctionStmt, node);
|
||||
|
||||
|
@ -1356,7 +1357,8 @@ DefineAggregateStmtObjectAddress(Node *node, bool missing_ok)
|
|||
* the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterFunctionStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterFunctionStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterFunctionStmt *stmt = castNode(AlterFunctionStmt, node);
|
||||
AssertObjectTypeIsFunctional(stmt->objtype);
|
||||
|
@ -1390,7 +1392,8 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString)
|
|||
* types in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessRenameFunctionStmt(Node *node, const char *queryString)
|
||||
PreprocessRenameFunctionStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||
AssertObjectTypeIsFunctional(stmt->renameType);
|
||||
|
@ -1421,7 +1424,8 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString)
|
|||
* In this stage we can prepare the commands that need to be run on all workers.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
AssertObjectTypeIsFunctional(stmt->objectType);
|
||||
|
@ -1453,7 +1457,8 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString)
|
|||
* all the workers to keep the type in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
AssertObjectTypeIsFunctional(stmt->objectType);
|
||||
|
@ -1487,7 +1492,8 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString)
|
|||
* functions will still be dropped locally but not on the workers.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropFunctionStmt(Node *node, const char *queryString)
|
||||
PreprocessDropFunctionStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *stmt = castNode(DropStmt, node);
|
||||
List *deletingObjectWithArgsList = stmt->objects;
|
||||
|
@ -1591,7 +1597,8 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString)
|
|||
* don't allow this dependency to be created.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterFunctionDependsStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterFunctionDependsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectDependsStmt *stmt = castNode(AlterObjectDependsStmt, node);
|
||||
AssertObjectTypeIsFunctional(stmt->objectType);
|
||||
|
|
|
@ -13,7 +13,8 @@
|
|||
|
||||
/* placeholder for PreprocessGrantStmt */
|
||||
List *
|
||||
PreprocessGrantStmt(Node *node, const char *queryString)
|
||||
PreprocessGrantStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
|
|
@ -130,7 +130,8 @@ IsIndexRenameStmt(RenameStmt *renameStmt)
|
|||
* in a List. If no distributed table is involved, this function returns NIL.
|
||||
*/
|
||||
List *
|
||||
PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
||||
PreprocessIndexStmt(Node *node, const char *createIndexCommand,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
IndexStmt *createIndexStatement = castNode(IndexStmt, node);
|
||||
|
||||
|
@ -528,7 +529,8 @@ GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement)
|
|||
* in a List. If no distributed table is involved, this function returns NIL.
|
||||
*/
|
||||
List *
|
||||
PreprocessReindexStmt(Node *node, const char *reindexCommand)
|
||||
PreprocessReindexStmt(Node *node, const char *reindexCommand,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
ReindexStmt *reindexStatement = castNode(ReindexStmt, node);
|
||||
List *ddlJobs = NIL;
|
||||
|
@ -637,7 +639,8 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand)
|
|||
* in a List. If no distributed table is involved, this function returns NIL.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand)
|
||||
PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *dropIndexStatement = castNode(DropStmt, node);
|
||||
List *ddlJobs = NIL;
|
||||
|
|
|
@ -28,7 +28,8 @@ CreatePolicyCommands(Oid relationId)
|
|||
|
||||
/* placeholder for PreprocessCreatePolicyStmt */
|
||||
List *
|
||||
PreprocessCreatePolicyStmt(Node *node, const char *queryString)
|
||||
PreprocessCreatePolicyStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
CreatePolicyStmt *stmt = castNode(CreatePolicyStmt, node);
|
||||
Oid relationId = RangeVarGetRelid(stmt->table,
|
||||
|
@ -48,7 +49,8 @@ PreprocessCreatePolicyStmt(Node *node, const char *queryString)
|
|||
|
||||
/* placeholder for PreprocessAlterPolicyStmt */
|
||||
List *
|
||||
PreprocessAlterPolicyStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterPolicyStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
/* placeholder for future implementation */
|
||||
return NIL;
|
||||
|
@ -71,7 +73,8 @@ ErrorIfUnsupportedPolicy(Relation relation)
|
|||
|
||||
/* placeholder for PreprocessDropPolicyStmt */
|
||||
List *
|
||||
PreprocessDropPolicyStmt(Node *node, const char *queryString)
|
||||
PreprocessDropPolicyStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
/* placeholder for future implementation */
|
||||
return NIL;
|
||||
|
|
|
@ -26,7 +26,8 @@
|
|||
* tributed table is involved, this function returns NIL.
|
||||
*/
|
||||
List *
|
||||
PreprocessRenameStmt(Node *node, const char *renameCommand)
|
||||
PreprocessRenameStmt(Node *node, const char *renameCommand,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
RenameStmt *renameStmt = castNode(RenameStmt, node);
|
||||
Oid objectRelationId = InvalidOid; /* SQL Object OID */
|
||||
|
@ -144,7 +145,8 @@ ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt)
|
|||
* a specialized implementation if present, otherwise return an empty list for its DDLJobs
|
||||
*/
|
||||
List *
|
||||
PreprocessRenameAttributeStmt(Node *node, const char *queryString)
|
||||
PreprocessRenameAttributeStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||
Assert(stmt->renameType == OBJECT_ATTRIBUTE);
|
||||
|
@ -153,7 +155,8 @@ PreprocessRenameAttributeStmt(Node *node, const char *queryString)
|
|||
{
|
||||
case OBJECT_TYPE:
|
||||
{
|
||||
return PreprocessRenameTypeAttributeStmt(node, queryString);
|
||||
return PreprocessRenameTypeAttributeStmt(node, queryString,
|
||||
processUtilityContext);
|
||||
}
|
||||
|
||||
default:
|
||||
|
|
|
@ -181,7 +181,8 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
|
|||
* role set statement.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterRoleSetStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterRoleSetStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagate())
|
||||
{
|
||||
|
|
|
@ -49,7 +49,8 @@ static void EnsureSequentialModeForSchemaDDL(void);
|
|||
* under dropped schema involved in any foreign key relationship.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessDropSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *dropStatement = castNode(DropStmt, node);
|
||||
Relation pgClass = NULL;
|
||||
|
@ -133,7 +134,8 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString)
|
|||
* on schemas. Only grant statements for distributed schema are propagated.
|
||||
*/
|
||||
List *
|
||||
PreprocessGrantOnSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessGrantOnSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
GrantStmt *stmt = castNode(GrantStmt, node);
|
||||
Assert(stmt->objtype == OBJECT_SCHEMA);
|
||||
|
@ -166,7 +168,8 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString)
|
|||
* is executed on all the workers to keep the schemas in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
ObjectAddress schemaAddress = GetObjectAddressFromParseTree(node, false);
|
||||
if (!ShouldPropagateObject(&schemaAddress))
|
||||
|
|
|
@ -65,7 +65,8 @@ static char * CreateAlterCommandIfTargetNotDefault(Oid statsOid);
|
|||
* CREATE STATISTICS.
|
||||
*/
|
||||
List *
|
||||
PreprocessCreateStatisticsStmt(Node *node, const char *queryString)
|
||||
PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
|
||||
|
||||
|
@ -150,7 +151,8 @@ CreateStatisticsStmtObjectAddress(Node *node, bool missingOk)
|
|||
* DROP STATISTICS.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropStatisticsStmt(Node *node, const char *queryString)
|
||||
PreprocessDropStatisticsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *dropStatisticsStmt = castNode(DropStmt, node);
|
||||
Assert(dropStatisticsStmt->removeType == OBJECT_STATISTIC_EXT);
|
||||
|
@ -208,7 +210,8 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString)
|
|||
* ALTER STATISTICS RENAME.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
RenameStmt *renameStmt = castNode(RenameStmt, node);
|
||||
Assert(renameStmt->renameType == OBJECT_STATISTIC_EXT);
|
||||
|
@ -246,7 +249,8 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString)
|
|||
* ALTER STATISTICS SET SCHEMA.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
@ -338,7 +342,8 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk)
|
|||
* ALTER STATISTICS .. SET STATISTICS.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterStatisticsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterStatsStmt *stmt = castNode(AlterStatsStmt, node);
|
||||
|
||||
|
@ -377,7 +382,8 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString)
|
|||
* ALTER STATISTICS .. OWNER TO.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
|
|
@ -45,10 +45,23 @@
|
|||
/* Local functions forward declarations for unsupported command checks */
|
||||
static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement,
|
||||
const char *queryString);
|
||||
static void ErrorIfAlterTableDefinesFKeyFromPostgresToCitusLocalTable(
|
||||
static bool AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(
|
||||
AlterTableStmt *alterTableStatement);
|
||||
static List * GetAlterTableStmtFKeyConstraintList(AlterTableStmt *alterTableStatement);
|
||||
static bool RelationIdListContainsCitusTableType(List *relationIdList,
|
||||
CitusTableType citusTableType);
|
||||
static bool RelationIdListContainsPostgresTable(List *relationIdList);
|
||||
static void ConvertPostgresLocalTablesToCitusLocalTables(
|
||||
AlterTableStmt *alterTableStatement);
|
||||
static int CompareRangeVarsByOid(const void *leftElement, const void *rightElement);
|
||||
static List * GetAlterTableAddFKeyRightRelationIdList(
|
||||
AlterTableStmt *alterTableStatement);
|
||||
static List * GetAlterTableAddFKeyRightRelationRangeVarList(
|
||||
AlterTableStmt *alterTableStatement);
|
||||
static List * GetAlterTableAddFKeyConstraintList(AlterTableStmt *alterTableStatement);
|
||||
static List * GetAlterTableCommandFKeyConstraintList(AlterTableCmd *command);
|
||||
static List * GetRangeVarListFromFKeyConstraintList(List *fKeyConstraintList);
|
||||
static List * GetRelationIdListFromRangeVarList(List *rangeVarList, LOCKMODE lockmode,
|
||||
bool missingOk);
|
||||
static bool AlterTableCommandTypeIsTrigger(AlterTableType alterTableType);
|
||||
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
|
||||
static void ErrorIfCitusLocalTablePartitionCommand(AlterTableCmd *alterTableCmd,
|
||||
|
@ -87,7 +100,8 @@ static bool SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *comma
|
|||
* about not processing same DROP command twice.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropTableStmt(Node *node, const char *queryString)
|
||||
PreprocessDropTableStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *dropTableStatement = castNode(DropStmt, node);
|
||||
|
||||
|
@ -153,7 +167,7 @@ PreprocessDropTableStmt(Node *node, const char *queryString)
|
|||
/*
|
||||
* PostprocessCreateTableStmt takes CreateStmt object as a parameter and errors
|
||||
* out if it creates a table with a foreign key that references to a citus local
|
||||
* table if pg version is older than 13 (see comment in function).
|
||||
* table.
|
||||
*
|
||||
* This function also processes CREATE TABLE ... PARTITION OF statements via
|
||||
* PostprocessCreateTableStmtPartitionOf function.
|
||||
|
@ -161,17 +175,6 @@ PreprocessDropTableStmt(Node *node, const char *queryString)
|
|||
void
|
||||
PostprocessCreateTableStmt(CreateStmt *createStatement, const char *queryString)
|
||||
{
|
||||
#if PG_VERSION_NUM < PG_VERSION_13
|
||||
|
||||
/*
|
||||
* Postgres processes foreign key constraints implied by CREATE TABLE
|
||||
* commands by internally executing ALTER TABLE commands via standard
|
||||
* process utility starting from PG13. Hence, we will already perform
|
||||
* unsupported foreign key checks via PreprocessAlterTableStmt function
|
||||
* in PG13. But for the older version, we need to do unsupported foreign
|
||||
* key checks here.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Relation must exist and it is already locked as standard process utility
|
||||
* is already executed.
|
||||
|
@ -182,7 +185,6 @@ PostprocessCreateTableStmt(CreateStmt *createStatement, const char *queryString)
|
|||
{
|
||||
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(relationId);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (createStatement->inhRelations != NIL && createStatement->partbound != NULL)
|
||||
{
|
||||
|
@ -346,7 +348,8 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
|||
* function returns NIL.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
|
||||
PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterTableStmt *alterTableStatement = castNode(AlterTableStmt, node);
|
||||
|
||||
|
@ -376,14 +379,38 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
|
|||
leftRelationId = IndexGetRelation(leftRelationId, missingOk);
|
||||
}
|
||||
|
||||
if (processUtilityContext != PROCESS_UTILITY_SUBCOMMAND &&
|
||||
AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(alterTableStatement) &&
|
||||
CoordinatorAddedAsWorkerNode())
|
||||
{
|
||||
/*
|
||||
* Normally, we would do this check in ErrorIfUnsupportedForeignConstraintExists
|
||||
* in post process step. However, we skip doing error checks in post process if
|
||||
* this pre process returns NIL -and this method returns NIL if the left relation
|
||||
* is a postgres table. So, we need to error out for foreign keys from postgres
|
||||
* tables to citus local tables here.
|
||||
* We don't process subcommands generated by postgres.
|
||||
* This is mainly because postgres started to issue ALTER TABLE commands
|
||||
* for some set of objects that are defined via CREATE TABLE commands as
|
||||
* of pg13. However, citus already has a separate logic for CREATE TABLE
|
||||
* commands.
|
||||
*
|
||||
* To support foreign keys from/to postgres local tables to/from reference
|
||||
* or citus local tables, we convert given postgres local table -and the
|
||||
* other postgres tables that it is connected via a fkey graph- to a citus
|
||||
* local table.
|
||||
*
|
||||
* Note that we don't convert postgres tables to citus local tables if
|
||||
* coordinator is not added to metadata as CreateCitusLocalTable requires
|
||||
* this. In this case, we assume user is about to create reference or
|
||||
* distributed table from local table and we don't want to break user
|
||||
* experience by asking to add coordinator to metadata.
|
||||
*/
|
||||
ErrorIfAlterTableDefinesFKeyFromPostgresToCitusLocalTable(alterTableStatement);
|
||||
ConvertPostgresLocalTablesToCitusLocalTables(alterTableStatement);
|
||||
|
||||
/*
|
||||
* CreateCitusLocalTable converts relation to a shard relation and creates
|
||||
* shell table from scratch.
|
||||
* For this reason we should re-enter to PreprocessAlterTableStmt to operate
|
||||
* on shell table relation id.
|
||||
*/
|
||||
return PreprocessAlterTableStmt(node, alterTableCommand, processUtilityContext);
|
||||
}
|
||||
|
||||
bool referencingIsLocalTable = !IsCitusTable(leftRelationId);
|
||||
if (referencingIsLocalTable)
|
||||
|
@ -593,59 +620,249 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
|
|||
|
||||
|
||||
/*
|
||||
* ErrorIfAlterTableDefinesFKeyFromPostgresToCitusLocalTable errors out if
|
||||
* given ALTER TABLE statement defines foreign key from a postgres local table
|
||||
* to a citus local table.
|
||||
* AlterTableDefinesFKeyBetweenPostgresAndNonDistTable returns true if given
|
||||
* alter table command defines foreign key between a postgres table and a
|
||||
* reference or citus local table.
|
||||
*/
|
||||
static bool
|
||||
AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
List *foreignKeyConstraintList =
|
||||
GetAlterTableAddFKeyConstraintList(alterTableStatement);
|
||||
if (list_length(foreignKeyConstraintList) == 0)
|
||||
{
|
||||
/* we are not defining any foreign keys */
|
||||
return false;
|
||||
}
|
||||
|
||||
List *rightRelationIdList =
|
||||
GetAlterTableAddFKeyRightRelationIdList(alterTableStatement);
|
||||
|
||||
LOCKMODE lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
||||
Oid leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||
if (!IsCitusTable(leftRelationId))
|
||||
{
|
||||
return RelationIdListContainsCitusTableType(rightRelationIdList,
|
||||
CITUS_TABLE_WITH_NO_DIST_KEY);
|
||||
}
|
||||
else if (IsCitusTableType(leftRelationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||
{
|
||||
return RelationIdListContainsPostgresTable(rightRelationIdList);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelationIdListContainsCitusTableType returns true if given relationIdList
|
||||
* contains a citus table with given type.
|
||||
*/
|
||||
static bool
|
||||
RelationIdListContainsCitusTableType(List *relationIdList, CitusTableType citusTableType)
|
||||
{
|
||||
Oid relationId = InvalidOid;
|
||||
foreach_oid(relationId, relationIdList)
|
||||
{
|
||||
if (IsCitusTableType(relationId, citusTableType))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelationIdListContainsPostgresTable returns true if given relationIdList
|
||||
* contains a postgres table.
|
||||
*/
|
||||
static bool
|
||||
RelationIdListContainsPostgresTable(List *relationIdList)
|
||||
{
|
||||
Oid relationId = InvalidOid;
|
||||
foreach_oid(relationId, relationIdList)
|
||||
{
|
||||
if (OidIsValid(relationId) && !IsCitusTable(relationId))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConvertPostgresLocalTablesToCitusLocalTables converts each postgres table
|
||||
* involved in foreign keys to be defined by given alter table command and the
|
||||
* other tables connected to them via a foreign key graph to citus local tables.
|
||||
*/
|
||||
static void
|
||||
ErrorIfAlterTableDefinesFKeyFromPostgresToCitusLocalTable(
|
||||
AlterTableStmt *alterTableStatement)
|
||||
ConvertPostgresLocalTablesToCitusLocalTables(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
List *rightRelationRangeVarList =
|
||||
GetAlterTableAddFKeyRightRelationRangeVarList(alterTableStatement);
|
||||
RangeVar *leftRelationRangeVar = alterTableStatement->relation;
|
||||
List *relationRangeVarList = lappend(rightRelationRangeVarList, leftRelationRangeVar);
|
||||
|
||||
/*
|
||||
* To prevent deadlocks, sort the list before converting each postgres local
|
||||
* table to a citus local table.
|
||||
*/
|
||||
relationRangeVarList = SortList(relationRangeVarList, CompareRangeVarsByOid);
|
||||
|
||||
/*
|
||||
* Here we should operate on RangeVar objects since relations oid's would
|
||||
* change in below loop due to CreateCitusLocalTable.
|
||||
*/
|
||||
RangeVar *relationRangeVar;
|
||||
foreach_ptr(relationRangeVar, relationRangeVarList)
|
||||
{
|
||||
List *commandList = alterTableStatement->cmds;
|
||||
|
||||
LOCKMODE lockmode = AlterTableGetLockLevel(commandList);
|
||||
Oid leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||
|
||||
if (IsCitusTable(leftRelationId))
|
||||
LOCKMODE lockMode = AlterTableGetLockLevel(commandList);
|
||||
bool missingOk = alterTableStatement->missing_ok;
|
||||
Oid relationId = RangeVarGetRelid(relationRangeVar, lockMode, missingOk);
|
||||
if (!OidIsValid(relationId))
|
||||
{
|
||||
/* left relation is not a postgres local table, */
|
||||
return;
|
||||
/*
|
||||
* As we are in preprocess, missingOk might be true and relation
|
||||
* might not exist.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
else if (IsCitusTable(relationId))
|
||||
{
|
||||
/*
|
||||
* relationRangeVarList has also reference and citus local tables
|
||||
* involved in this ADD FOREIGN KEY command. Moreover, even if
|
||||
* relationId was belonging to a postgres local table initially,
|
||||
* we might had already converted it to a citus local table by cascading.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
List *alterTableFKeyConstraints =
|
||||
GetAlterTableStmtFKeyConstraintList(alterTableStatement);
|
||||
Constraint *constraint = NULL;
|
||||
foreach_ptr(constraint, alterTableFKeyConstraints)
|
||||
/*
|
||||
* The only reason behind using a try/catch block here is giving a proper
|
||||
* error message. For example, when creating a citus local table we might
|
||||
* give an error telling that partitioned tables are not supported for
|
||||
* citus local table creation. But as a user it wouldn't make much sense
|
||||
* to see such an error. So here we extend error message to tell that we
|
||||
* actually ended up with this error when trying to define the foreign key.
|
||||
*
|
||||
* Also, as CopyErrorData() requires (CurrentMemoryContext != ErrorContext),
|
||||
* so we store CurrentMemoryContext here.
|
||||
*/
|
||||
MemoryContext savedMemoryContext = CurrentMemoryContext;
|
||||
PG_TRY();
|
||||
{
|
||||
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, lockmode,
|
||||
alterTableStatement->missing_ok);
|
||||
if (IsCitusTableType(rightRelationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(leftRelationId);
|
||||
bool cascade = true;
|
||||
CreateCitusLocalTable(relationId, cascade);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
MemoryContextSwitchTo(savedMemoryContext);
|
||||
|
||||
ErrorData *errorData = CopyErrorData();
|
||||
FlushErrorState();
|
||||
|
||||
if (errorData->elevel != ERROR)
|
||||
{
|
||||
PG_RE_THROW();
|
||||
}
|
||||
|
||||
/* override error detail */
|
||||
errorData->detail = "When adding a foreign key from a local table to "
|
||||
"a reference table, Citus applies a conversion to "
|
||||
"all the local tables in the foreign key graph";
|
||||
ThrowErrorData(errorData);
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAlterTableStmtFKeyConstraintList returns a list of Constraint objects for
|
||||
* the foreign keys that given ALTER TABLE statement defines.
|
||||
* CompareRangeVarsByOid is a comparison function to sort RangeVar object list.
|
||||
*/
|
||||
static int
|
||||
CompareRangeVarsByOid(const void *leftElement, const void *rightElement)
|
||||
{
|
||||
RangeVar *leftRangeVar = *((RangeVar **) leftElement);
|
||||
RangeVar *rightRangeVar = *((RangeVar **) rightElement);
|
||||
|
||||
/*
|
||||
* Any way we will check their existence, so it's okay to map non-existing
|
||||
* relations to InvalidOid when sorting.
|
||||
*/
|
||||
bool missingOk = true;
|
||||
|
||||
/*
|
||||
* As this is an object comparator function, there is no way to understand
|
||||
* proper lock mode. So assume caller already locked relations.
|
||||
*/
|
||||
LOCKMODE lockMode = NoLock;
|
||||
|
||||
Oid leftRelationId = RangeVarGetRelid(leftRangeVar, lockMode, missingOk);
|
||||
Oid rightRelationId = RangeVarGetRelid(rightRangeVar, lockMode, missingOk);
|
||||
return CompareOids(&leftRelationId, &rightRelationId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAlterTableAddFKeyRightRelationIdList returns a list of oid's for right
|
||||
* relations involved in foreign keys to be defined by given ALTER TABLE command.
|
||||
*/
|
||||
static List *
|
||||
GetAlterTableStmtFKeyConstraintList(AlterTableStmt *alterTableStatement)
|
||||
GetAlterTableAddFKeyRightRelationIdList(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
List *alterTableFKeyConstraintList = NIL;
|
||||
List *rightRelationRangeVarList =
|
||||
GetAlterTableAddFKeyRightRelationRangeVarList(alterTableStatement);
|
||||
List *commandList = alterTableStatement->cmds;
|
||||
LOCKMODE lockMode = AlterTableGetLockLevel(commandList);
|
||||
bool missingOk = alterTableStatement->missing_ok;
|
||||
List *rightRelationIdList =
|
||||
GetRelationIdListFromRangeVarList(rightRelationRangeVarList, lockMode, missingOk);
|
||||
return rightRelationIdList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAlterTableAddFKeyRightRelationRangeVarList returns a list of RangeVar
|
||||
* objects for right relations involved in foreign keys to be defined by
|
||||
* given ALTER TABLE command.
|
||||
*/
|
||||
static List *
|
||||
GetAlterTableAddFKeyRightRelationRangeVarList(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
List *fKeyConstraintList = GetAlterTableAddFKeyConstraintList(alterTableStatement);
|
||||
List *rightRelationRangeVarList =
|
||||
GetRangeVarListFromFKeyConstraintList(fKeyConstraintList);
|
||||
return rightRelationRangeVarList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAlterTableAddFKeyConstraintList returns a list of Constraint objects for
|
||||
* foreign keys that given ALTER TABLE to be defined by given ALTER TABLE command.
|
||||
*/
|
||||
static List *
|
||||
GetAlterTableAddFKeyConstraintList(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
List *foreignKeyConstraintList = NIL;
|
||||
|
||||
List *commandList = alterTableStatement->cmds;
|
||||
AlterTableCmd *command = NULL;
|
||||
foreach_ptr(command, commandList)
|
||||
{
|
||||
List *commandFKeyConstraintList = GetAlterTableCommandFKeyConstraintList(command);
|
||||
alterTableFKeyConstraintList = list_concat(alterTableFKeyConstraintList,
|
||||
commandFKeyConstraintList);
|
||||
List *commandForeignKeyConstraintList =
|
||||
GetAlterTableCommandFKeyConstraintList(command);
|
||||
foreignKeyConstraintList = list_concat(foreignKeyConstraintList,
|
||||
commandForeignKeyConstraintList);
|
||||
}
|
||||
|
||||
return alterTableFKeyConstraintList;
|
||||
return foreignKeyConstraintList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -690,6 +907,47 @@ GetAlterTableCommandFKeyConstraintList(AlterTableCmd *command)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetRangeVarListFromFKeyConstraintList returns a list of RangeVar objects for
|
||||
* right relations in fKeyConstraintList.
|
||||
*/
|
||||
static List *
|
||||
GetRangeVarListFromFKeyConstraintList(List *fKeyConstraintList)
|
||||
{
|
||||
List *rightRelationRangeVarList = NIL;
|
||||
|
||||
Constraint *fKeyConstraint = NULL;
|
||||
foreach_ptr(fKeyConstraint, fKeyConstraintList)
|
||||
{
|
||||
RangeVar *rightRelationRangeVar = fKeyConstraint->pktable;
|
||||
rightRelationRangeVarList = lappend(rightRelationRangeVarList,
|
||||
rightRelationRangeVar);
|
||||
}
|
||||
|
||||
return rightRelationRangeVarList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetRelationIdListFromRangeVarList returns relation id list for relations
|
||||
* identified by RangeVar objects in given list.
|
||||
*/
|
||||
static List *
|
||||
GetRelationIdListFromRangeVarList(List *rangeVarList, LOCKMODE lockMode, bool missingOk)
|
||||
{
|
||||
List *relationIdList = NIL;
|
||||
|
||||
RangeVar *rangeVar = NULL;
|
||||
foreach_ptr(rangeVar, rangeVarList)
|
||||
{
|
||||
Oid rightRelationId = RangeVarGetRelid(rangeVar, lockMode, missingOk);
|
||||
relationIdList = lappend_oid(relationIdList, rightRelationId);
|
||||
}
|
||||
|
||||
return relationIdList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AlterTableCommandTypeIsTrigger returns true if given alter table command type
|
||||
* is identifies an ALTER TABLE .. TRIGGER .. command.
|
||||
|
@ -724,7 +982,8 @@ AlterTableCommandTypeIsTrigger(AlterTableType alterTableType)
|
|||
* the distributed environment. We warn out here.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
ereport(WARNING, (errmsg("not propagating ALTER TABLE ALL IN TABLESPACE "
|
||||
"commands to worker nodes"),
|
||||
|
@ -743,7 +1002,8 @@ PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString)
|
|||
* shards.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TABLE);
|
||||
|
|
|
@ -483,7 +483,8 @@ GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDepen
|
|||
* standard process utility.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropTriggerStmt(Node *node, const char *queryString)
|
||||
PreprocessDropTriggerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *dropTriggerStmt = castNode(DropStmt, node);
|
||||
Assert(dropTriggerStmt->removeType == OBJECT_TRIGGER);
|
||||
|
|
|
@ -116,7 +116,8 @@ static bool ShouldPropagateTypeCreate(void);
|
|||
* access to the ObjectAddress of the new type.
|
||||
*/
|
||||
List *
|
||||
PreprocessCompositeTypeStmt(Node *node, const char *queryString)
|
||||
PreprocessCompositeTypeStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagateTypeCreate())
|
||||
{
|
||||
|
@ -201,7 +202,8 @@ PostprocessCompositeTypeStmt(Node *node, const char *queryString)
|
|||
* this is already implemented by the post processing for adding columns to tables.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTypeStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterTypeStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
||||
Assert(stmt->relkind == OBJECT_TYPE);
|
||||
|
@ -244,7 +246,8 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString)
|
|||
* ObjectAddress for the new type just yet.
|
||||
*/
|
||||
List *
|
||||
PreprocessCreateEnumStmt(Node *node, const char *queryString)
|
||||
PreprocessCreateEnumStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagateTypeCreate())
|
||||
{
|
||||
|
@ -316,7 +319,8 @@ PostprocessCreateEnumStmt(Node *node, const char *queryString)
|
|||
* workers directly to keep the types in sync accross the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterEnumStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterEnumStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
List *commands = NIL;
|
||||
|
||||
|
@ -446,7 +450,8 @@ PostprocessAlterEnumStmt(Node *node, const char *queryString)
|
|||
* no types in the drop list are distributed no calls will be made to the workers.
|
||||
*/
|
||||
List *
|
||||
PreprocessDropTypeStmt(Node *node, const char *queryString)
|
||||
PreprocessDropTypeStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
DropStmt *stmt = castNode(DropStmt, node);
|
||||
|
||||
|
@ -514,7 +519,8 @@ PreprocessDropTypeStmt(Node *node, const char *queryString)
|
|||
* executed on all the workers to keep the types in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessRenameTypeStmt(Node *node, const char *queryString)
|
||||
PreprocessRenameTypeStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
|
||||
if (!ShouldPropagateObject(&typeAddress))
|
||||
|
@ -547,7 +553,8 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString)
|
|||
* keep the type in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString)
|
||||
PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||
Assert(stmt->renameType == OBJECT_ATTRIBUTE);
|
||||
|
@ -579,7 +586,8 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString)
|
|||
* In this stage we can prepare the commands that need to be run on all workers.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TYPE);
|
||||
|
@ -637,7 +645,8 @@ PostprocessAlterTypeSchemaStmt(Node *node, const char *queryString)
|
|||
* the workers to keep the type in sync across the cluster.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString)
|
||||
PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_TYPE);
|
||||
|
|
|
@ -384,7 +384,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
|
||||
if (ops && ops->preprocess)
|
||||
{
|
||||
ddlJobs = ops->preprocess(parsetree, queryString);
|
||||
ddlJobs = ops->preprocess(parsetree, queryString, context);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "utils/rel.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "tcop/dest.h"
|
||||
#include "tcop/utility.h"
|
||||
|
||||
/*
|
||||
* DistributeObjectOps specifies handlers for node/object type pairs.
|
||||
|
@ -39,7 +40,7 @@ typedef struct DistributeObjectOps
|
|||
{
|
||||
char * (*deparse)(Node *);
|
||||
void (*qualify)(Node *);
|
||||
List * (*preprocess)(Node *, const char *);
|
||||
List * (*preprocess)(Node *, const char *, ProcessUtilityContext);
|
||||
List * (*postprocess)(Node *, const char *);
|
||||
ObjectAddress (*address)(Node *, bool);
|
||||
} DistributeObjectOps;
|
||||
|
@ -102,7 +103,8 @@ typedef enum SearchForeignKeyColumnFlags
|
|||
|
||||
|
||||
/* cluster.c - forward declarations */
|
||||
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
|
||||
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
/* index.c */
|
||||
typedef void (*PGIndexProcessor)(Form_pg_index, List **);
|
||||
|
@ -116,10 +118,16 @@ extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *d
|
|||
extern char * CreateCollationDDL(Oid collationId);
|
||||
extern List * CreateCollationDDLsIdempotent(Oid collationId);
|
||||
extern ObjectAddress AlterCollationOwnerObjectAddress(Node *stmt, bool missing_ok);
|
||||
extern List * PreprocessDropCollationStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterCollationOwnerStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessRenameCollationStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropCollationStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterCollationOwnerStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * PreprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * PreprocessRenameCollationStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern ObjectAddress RenameCollationStmtObjectAddress(Node *stmt, bool missing_ok);
|
||||
extern ObjectAddress AlterCollationSchemaStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
|
@ -133,16 +141,23 @@ extern bool IsDropCitusExtensionStmt(Node *parsetree);
|
|||
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
|
||||
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
|
||||
extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropExtensionStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropExtensionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterExtensionSchemaStmt(Node *stmt,
|
||||
const char *queryString);
|
||||
const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * PostprocessAlterExtensionSchemaStmt(Node *stmt,
|
||||
const char *queryString);
|
||||
extern List * PreprocessAlterExtensionUpdateStmt(Node *stmt,
|
||||
const char *queryString);
|
||||
const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern void PostprocessAlterExtensionCitusUpdateStmt(Node *node);
|
||||
extern List * PreprocessAlterExtensionContentsStmt(Node *node,
|
||||
const char *queryString);
|
||||
const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * CreateExtensionDDLCommand(const ObjectAddress *extensionAddress);
|
||||
extern ObjectAddress AlterExtensionSchemaStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
|
@ -182,51 +197,64 @@ extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);
|
|||
|
||||
|
||||
/* function.c - forward declarations */
|
||||
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessCreateFunctionStmt(Node *stmt,
|
||||
const char *queryString);
|
||||
extern ObjectAddress CreateFunctionStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern ObjectAddress DefineAggregateStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern List * PreprocessAlterFunctionStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterFunctionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern ObjectAddress AlterFunctionStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern List * PreprocessRenameFunctionStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessRenameFunctionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern ObjectAddress RenameFunctionStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern List * PreprocessAlterFunctionOwnerStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterFunctionOwnerStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern ObjectAddress AlterFunctionOwnerObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern List * PreprocessAlterFunctionSchemaStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterFunctionSchemaStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern ObjectAddress AlterFunctionSchemaStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern List * PostprocessAlterFunctionSchemaStmt(Node *stmt,
|
||||
const char *queryString);
|
||||
extern List * PreprocessDropFunctionStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropFunctionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterFunctionDependsStmt(Node *stmt,
|
||||
const char *queryString);
|
||||
const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern ObjectAddress AlterFunctionDependsStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
|
||||
|
||||
/* grant.c - forward declarations */
|
||||
extern List * PreprocessGrantStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessGrantStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
|
||||
/* index.c - forward declarations */
|
||||
extern bool IsIndexRenameStmt(RenameStmt *renameStmt);
|
||||
extern List * PreprocessIndexStmt(Node *createIndexStatement,
|
||||
const char *createIndexCommand);
|
||||
const char *createIndexCommand,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern char * ChooseIndexName(const char *tabname, Oid namespaceId,
|
||||
List *colnames, List *exclusionOpNames,
|
||||
bool primary, bool isconstraint);
|
||||
extern char * ChooseIndexNameAddition(List *colnames);
|
||||
extern List * ChooseIndexColumnNames(List *indexElems);
|
||||
extern List * PreprocessReindexStmt(Node *ReindexStatement,
|
||||
const char *ReindexCommand);
|
||||
const char *ReindexCommand,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessDropIndexStmt(Node *dropIndexStatement,
|
||||
const char *dropIndexCommand);
|
||||
const char *dropIndexCommand,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessIndexStmt(Node *node,
|
||||
const char *queryString);
|
||||
extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement);
|
||||
|
@ -241,9 +269,12 @@ extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_o
|
|||
/* policy.c - forward declarations */
|
||||
extern List * CreatePolicyCommands(Oid relationId);
|
||||
extern void ErrorIfUnsupportedPolicy(Relation relation);
|
||||
extern List * PreprocessCreatePolicyStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterPolicyStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessDropPolicyStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessCreatePolicyStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterPolicyStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessDropPolicyStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern bool IsPolicyRenameStmt(RenameStmt *stmt);
|
||||
extern void CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName,
|
||||
uint64 shardId);
|
||||
|
@ -256,14 +287,17 @@ extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, u
|
|||
|
||||
|
||||
/* rename.c - forward declarations*/
|
||||
extern List * PreprocessRenameStmt(Node *renameStmt, const char *renameCommand);
|
||||
extern List * PreprocessRenameStmt(Node *renameStmt, const char *renameCommand,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
|
||||
extern List * PreprocessRenameAttributeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessRenameAttributeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
|
||||
/* role.c - forward declarations*/
|
||||
extern List * PostprocessAlterRoleStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterRoleSetStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterRoleSetStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * GenerateAlterRoleSetCommandForRole(Oid roleid);
|
||||
extern ObjectAddress AlterRoleStmtObjectAddress(Node *node,
|
||||
bool missing_ok);
|
||||
|
@ -273,11 +307,14 @@ extern List * GenerateCreateOrAlterRoleCommand(Oid roleOid);
|
|||
|
||||
/* schema.c - forward declarations */
|
||||
extern List * PreprocessDropSchemaStmt(Node *dropSchemaStatement,
|
||||
const char *queryString);
|
||||
const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt,
|
||||
const char *alterObjectSchemaCommand);
|
||||
extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok);
|
||||
|
||||
/* sequence.c - forward declarations */
|
||||
|
@ -285,16 +322,25 @@ extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
|||
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
||||
|
||||
/* statistics.c - forward declarations */
|
||||
extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessCreateStatisticsStmt(Node *node, const char *queryString);
|
||||
extern ObjectAddress CreateStatisticsStmtObjectAddress(Node *node, bool missingOk);
|
||||
extern List * PreprocessDropStatisticsStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessDropStatisticsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString);
|
||||
extern ObjectAddress AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk);
|
||||
extern List * PreprocessAlterStatisticsStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * GetExplicitStatisticsCommandList(Oid relationId);
|
||||
extern List * GetExplicitStatisticsSchemaIdList(Oid relationId);
|
||||
|
||||
|
@ -303,16 +349,20 @@ extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStm
|
|||
|
||||
|
||||
/* table.c - forward declarations */
|
||||
extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern void PostprocessCreateTableStmt(CreateStmt *createStatement,
|
||||
const char *queryString);
|
||||
extern List * PostprocessAlterTableStmtAttachPartition(
|
||||
AlterTableStmt *alterTableStatement,
|
||||
const char *queryString);
|
||||
extern List * PostprocessAlterTableSchemaStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand);
|
||||
extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||
const char *alterTableCommand);
|
||||
extern bool IsAlterTableRenameStmt(RenameStmt *renameStmt);
|
||||
|
@ -332,18 +382,28 @@ extern List * MakeNameListFromRangeVar(const RangeVar *rel);
|
|||
extern void PreprocessTruncateStatement(TruncateStmt *truncateStatement);
|
||||
|
||||
/* type.c - forward declarations */
|
||||
extern List * PreprocessCompositeTypeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessCompositeTypeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessCompositeTypeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterTypeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessCreateEnumStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterTypeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessCreateEnumStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessCreateEnumStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterEnumStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterEnumStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessAlterEnumStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropTypeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessRenameTypeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessRenameTypeAttributeStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterTypeOwnerStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessDropTypeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessRenameTypeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessRenameTypeAttributeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern List * PreprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PreprocessAlterTypeOwnerStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString);
|
||||
extern Node * CreateTypeStmtByObjectAddress(const ObjectAddress *address);
|
||||
extern ObjectAddress CompositeTypeStmtObjectAddress(Node *stmt, bool missing_ok);
|
||||
|
@ -384,7 +444,8 @@ extern List * PostprocessAlterTriggerDependsStmt(Node *node, const char *querySt
|
|||
extern void AlterTriggerDependsEventExtendNames(
|
||||
AlterObjectDependsStmt *alterTriggerDependsStmt,
|
||||
char *schemaName, uint64 shardId);
|
||||
extern List * PreprocessDropTriggerStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessDropTriggerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern void ErrorOutForTriggerIfNotCitusLocalTable(Oid relationId);
|
||||
extern void DropTriggerEventExtendNames(DropStmt *dropTriggerStmt, char *schemaName,
|
||||
uint64 shardId);
|
||||
|
@ -423,4 +484,8 @@ extern void CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys);
|
|||
extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt);
|
||||
extern void PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);
|
||||
|
||||
/* create_citus_local_table.c */
|
||||
|
||||
extern void CreateCitusLocalTable(Oid relationId, bool cascade);
|
||||
|
||||
#endif /*CITUS_COMMANDS_H */
|
||||
|
|
|
@ -413,8 +413,6 @@ NOTICE: executing the command locally: CREATE UNIQUE INDEX uniqueindex2_15040
|
|||
---- utility command execution ----
|
||||
---------------------------------------------------------------------
|
||||
SET search_path TO citus_local_tables_test_schema;
|
||||
-- any foreign key between citus local tables and other tables except reference tables cannot be set
|
||||
-- more tests at ref_citus_local_fkeys.sql
|
||||
-- between citus local tables and distributed tables
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_dist FOREIGN KEY(a) references distributed_table(a);
|
||||
ERROR: cannot create foreign key constraint since foreign keys from reference tables and citus local tables to distributed tables are not supported
|
||||
|
@ -422,15 +420,15 @@ ALTER TABLE distributed_table ADD CONSTRAINT fkey_dist_to_c FOREIGN KEY(a) refer
|
|||
ERROR: cannot create foreign key constraint since relations are not colocated or not referencing a reference table
|
||||
-- between citus local tables and local tables
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_local FOREIGN KEY(a) references local_table(a);
|
||||
ERROR: cannot create foreign key constraint as "local_table" is a postgres local table
|
||||
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1504027, 'citus_local_tables_test_schema', 1504037, 'citus_local_tables_test_schema', 'ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_local FOREIGN KEY(a) references local_table(a);')
|
||||
ALTER TABLE local_table
|
||||
ADD CONSTRAINT fkey_local_to_c FOREIGN KEY(a) references citus_local_table_1(a),
|
||||
ADD CONSTRAINT fkey_self FOREIGN KEY(a) references local_table(a);
|
||||
ERROR: cannot create foreign key constraint as "local_table" is a postgres local table
|
||||
ERROR: cannot execute ADD CONSTRAINT command with other subcommands
|
||||
ALTER TABLE local_table
|
||||
ADD COLUMN b int references citus_local_table_1(a),
|
||||
ADD COLUMN c int references local_table(a);
|
||||
ERROR: cannot create foreign key constraint as "local_table" is a postgres local table
|
||||
ERROR: cannot execute ADD COLUMN command with PRIMARY KEY, UNIQUE, FOREIGN and CHECK constraints
|
||||
CREATE TABLE local_table_4 (
|
||||
a int unique references citus_local_table_1(a),
|
||||
b int references local_table_4(a));
|
||||
|
@ -478,6 +476,8 @@ ORDER BY 1;
|
|||
TRUNCATE citus_local_table_1, citus_local_table_2, distributed_table, local_table, reference_table;
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.citus_local_table_2_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.local_table_xxxxx CASCADE
|
||||
NOTICE: truncate cascades to table "citus_local_table_1_xxxxxxx"
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.reference_table_xxxxx CASCADE
|
||||
-- test vacuum
|
||||
VACUUM citus_local_table_1;
|
||||
|
@ -485,6 +485,8 @@ VACUUM citus_local_table_1, distributed_table, local_table, reference_table;
|
|||
-- test drop
|
||||
DROP TABLE citus_local_table_1, citus_local_table_2, distributed_table, local_table, reference_table;
|
||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.reference_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.local_table_xxxxx CASCADE
|
||||
NOTICE: drop cascades to constraint fkey_c_to_local_1504027 on table citus_local_tables_test_schema.citus_local_table_1_1504027
|
||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_2_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE
|
||||
-- test some other udf's with citus local tables
|
||||
|
@ -498,9 +500,9 @@ SELECT create_citus_local_table('citus_local_table_4');
|
|||
-- should work --
|
||||
-- insert some data & create an index for table size udf's
|
||||
INSERT INTO citus_local_table_4 VALUES (1), (2), (3);
|
||||
NOTICE: executing the command locally: INSERT INTO citus_local_tables_test_schema.citus_local_table_4_1504037 AS citus_table_alias (a) VALUES (1), (2), (3)
|
||||
NOTICE: executing the command locally: INSERT INTO citus_local_tables_test_schema.citus_local_table_4_1504038 AS citus_table_alias (a) VALUES (1), (2), (3)
|
||||
CREATE INDEX citus_local_table_4_idx ON citus_local_table_4(a);
|
||||
NOTICE: executing the command locally: CREATE INDEX citus_local_table_4_idx_1504037 ON citus_local_tables_test_schema.citus_local_table_4_1504037 USING btree (a )
|
||||
NOTICE: executing the command locally: CREATE INDEX citus_local_table_4_idx_1504038 ON citus_local_tables_test_schema.citus_local_table_4_1504038 USING btree (a )
|
||||
SELECT citus_table_size('citus_local_table_4');
|
||||
citus_table_size
|
||||
---------------------------------------------------------------------
|
||||
|
@ -587,7 +589,7 @@ BEGIN;
|
|||
SELECT tableName FROM pg_catalog.pg_tables WHERE tablename LIKE 'citus_local_table_4%';
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
citus_local_table_4_1504037
|
||||
citus_local_table_4_1504038
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
|
@ -596,7 +598,7 @@ SELECT shardid, get_colocated_shard_array(shardid)
|
|||
FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='citus_local_table_4'::regclass) as shardid;
|
||||
shardid | get_colocated_shard_array
|
||||
---------------------------------------------------------------------
|
||||
1504037 | {1504037}
|
||||
1504038 | {1504038}
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
|
@ -626,7 +628,7 @@ ERROR: cannot delete from table
|
|||
CREATE TABLE postgres_local_table (a int);
|
||||
SELECT master_append_table_to_shard(shardId, 'postgres_local_table', 'localhost', :master_port)
|
||||
FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='citus_local_table_4'::regclass) as shardid;
|
||||
ERROR: cannot append to shardId 1504037
|
||||
ERROR: cannot append to shardId 1504038
|
||||
-- return true
|
||||
SELECT citus_table_is_visible('citus_local_table_4'::regclass::oid);
|
||||
citus_table_is_visible
|
||||
|
@ -667,7 +669,7 @@ SELECT create_citus_local_table('referenced_table');
|
|||
(1 row)
|
||||
|
||||
ALTER TABLE referencing_table ADD CONSTRAINT fkey_cl_to_cl FOREIGN KEY (a) REFERENCES referenced_table(a);
|
||||
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1504038, 'citus_local_tables_test_schema', 1504039, 'citus_local_tables_test_schema', 'ALTER TABLE referencing_table ADD CONSTRAINT fkey_cl_to_cl FOREIGN KEY (a) REFERENCES referenced_table(a);')
|
||||
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1504039, 'citus_local_tables_test_schema', 1504040, 'citus_local_tables_test_schema', 'ALTER TABLE referencing_table ADD CONSTRAINT fkey_cl_to_cl FOREIGN KEY (a) REFERENCES referenced_table(a);')
|
||||
-- observe the debug messages telling that we switch to sequential
|
||||
-- execution when truncating a citus local table that is referenced
|
||||
-- by another table
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
\set VERBOSITY terse
|
||||
SET citus.next_shard_id TO 1518000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE SCHEMA fkeys_between_local_ref;
|
||||
SET search_path TO fkeys_between_local_ref;
|
||||
SET client_min_messages to ERROR;
|
||||
-- create a view for testing
|
||||
CREATE VIEW citus_local_tables_in_schema AS
|
||||
SELECT logicalrelid FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='fkeys_between_local_ref' AND
|
||||
partmethod = 'n' AND repmodel = 'c';
|
||||
-- remove coordinator if it is added to pg_dist_node and test
|
||||
-- behavior when coordinator is not added to metadata
|
||||
SELECT COUNT(master_remove_node(nodename, nodeport)) < 2
|
||||
FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:master_port;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
create table ref (a int primary key);
|
||||
select create_reference_table('ref');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- creating local table that references to reference table is supported
|
||||
create table other (x int primary key, y int);
|
||||
-- creating reference table from a local table that references
|
||||
-- to reference table is supported
|
||||
alter table other add constraint fk foreign key (y) references ref (a) on delete cascade;
|
||||
select create_reference_table('other');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
drop table if exists ref, ref2 cascade;
|
||||
create table ref (a int primary key);
|
||||
create table ref2 (x int);
|
||||
alter table ref2 add constraint fk foreign key (x) references ref (a);
|
||||
select create_reference_table('ref');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- we can also define more foreign keys after creating reference
|
||||
-- table from referenced table
|
||||
alter table ref2 add constraint fk2 foreign key (x) references ref (a);
|
||||
-- then we can create reference table from referencing table
|
||||
select create_reference_table('ref2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
drop table if exists ref, ref2, other cascade;
|
||||
-- add coordinator to pg_dist_node for rest of the tests
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE local_table_1 (col_1 INT UNIQUE);
|
||||
CREATE TABLE local_table_2 (col_1 INT UNIQUE);
|
||||
CREATE TABLE local_table_3 (col_1 INT UNIQUE);
|
||||
CREATE TABLE local_table_4 (col_1 INT UNIQUE);
|
||||
INSERT INTO local_table_1 SELECT i FROM generate_series(195, 205) i;
|
||||
INSERT INTO local_table_2 SELECT i FROM generate_series(195, 205) i;
|
||||
INSERT INTO local_table_3 SELECT i FROM generate_series(195, 205) i;
|
||||
INSERT INTO local_table_4 SELECT i FROM generate_series(195, 205) i;
|
||||
-- _
|
||||
-- | |
|
||||
-- | v
|
||||
-- local_table_2 -> local_table_1 -> local_table_4
|
||||
-- ^ | |
|
||||
-- | v |
|
||||
-- local_table_3 <--------
|
||||
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
ALTER TABLE local_table_3 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES local_table_3(col_1);
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES local_table_4(col_1);
|
||||
ALTER TABLE local_table_4 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES local_table_3(col_1);
|
||||
ALTER TABLE local_table_4 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES local_table_4(col_1);
|
||||
CREATE TABLE reference_table_1(col_1 INT UNIQUE, col_2 INT);
|
||||
INSERT INTO reference_table_1 SELECT i FROM generate_series(195, 205) i;
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
INSERT INTO partitioned_table_1 SELECT i FROM generate_series(195, 205) i;
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES local_table_4(col_1);
|
||||
-- now that we attached partitioned table to graph below errors out
|
||||
-- since we cannot create citus local table from partitioned tables
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
ERROR: cannot create citus local table "partitioned_table_1", only regular tables and foreign tables are supported for citus local table creation
|
||||
ALTER TABLE partitioned_table_1 DROP CONSTRAINT fkey_8;
|
||||
BEGIN;
|
||||
-- now that we detached partitioned table from graph, succeeds
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
-- show that we converted all 4 local tables in this schema to citus local tables
|
||||
SELECT COUNT(*)=4 FROM citus_local_tables_in_schema;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- this actually attempts to convert local tables to citus local tables but errors out
|
||||
-- as citus doesn't support defining foreign keys via add column commands
|
||||
ALTER TABLE local_table_1 ADD COLUMN col_3 INT REFERENCES reference_table_1(col_1);
|
||||
ERROR: cannot execute ADD COLUMN command with PRIMARY KEY, UNIQUE, FOREIGN and CHECK constraints
|
||||
BEGIN;
|
||||
-- define a foreign key so that all 4 local tables become citus local tables
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
CREATE TABLE local_table_5 (col_1 INT UNIQUE);
|
||||
-- now define foreign key from local to citus local table
|
||||
ALTER TABLE local_table_5 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_1) REFERENCES local_table_2(col_1);
|
||||
-- now we have 5 citus local tables in this schema
|
||||
SELECT COUNT(*)=5 FROM citus_local_tables_in_schema;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- they fail as local_table_99 does not exist
|
||||
ALTER TABLE local_table_99 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ERROR: relation "local_table_99" does not exist
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES local_table_99(col_1);
|
||||
ERROR: relation "local_table_99" does not exist
|
||||
-- they fail as col_99 does not exist
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_99) REFERENCES reference_table_1(col_1);
|
||||
ERROR: column "col_99" referenced in foreign key constraint does not exist
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES reference_table_1(col_99);
|
||||
ERROR: column "col_99" referenced in foreign key constraint does not exist
|
||||
-- fails as col_2 does not have a unique/primary key constraint
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ERROR: there is no unique constraint matching given keys for referenced table "reference_table_1"
|
||||
CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT);
|
||||
INSERT INTO reference_table_2 SELECT i FROM generate_series(195, 205) i;
|
||||
BEGIN;
|
||||
-- define foreign key when both ends are local tables
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1);
|
||||
-- we still don't convert local tables to citus local tables when
|
||||
-- creating reference tables
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- now defining another foreign key would convert local tables to
|
||||
-- citus local tables as reference_table_2 is now a reference table
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1);
|
||||
-- now print metadata to show that everyting is fine
|
||||
SELECT logicalrelid::text AS tablename, partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid::text IN (SELECT tablename FROM pg_tables WHERE schemaname='fkeys_between_local_ref')
|
||||
ORDER BY tablename;
|
||||
tablename | partmethod | repmodel
|
||||
---------------------------------------------------------------------
|
||||
local_table_1 | n | c
|
||||
local_table_2 | n | c
|
||||
local_table_3 | n | c
|
||||
local_table_4 | n | c
|
||||
reference_table_1 | n | t
|
||||
reference_table_2 | n | t
|
||||
(6 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- we don't support foreign keys from citus local to reference tables
|
||||
-- with ON DELETE/UPDATE CASCADE behavior, so below two errors out
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES local_table_2(col_1) ON DELETE CASCADE;
|
||||
ERROR: cannot define foreign key constraint, foreign keys from reference tables to citus local tables can only be defined with NO ACTION or RESTRICT behaviors
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_local_to_ref FOREIGN KEY (col_1) REFERENCES local_table_2(col_1) ON UPDATE CASCADE;
|
||||
ERROR: cannot define foreign key constraint, foreign keys from reference tables to citus local tables can only be defined with NO ACTION or RESTRICT behaviors
|
||||
ROLLBACK;
|
||||
-- but we support such foreign key behaviors when foreign key is from
|
||||
-- citus local to reference table
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1) ON DELETE CASCADE;
|
||||
DELETE FROM reference_table_2 WHERE col_1=200;
|
||||
-- we deleted one row as DELETE cascades, so we should have 10 rows
|
||||
SELECT COUNT(*) FROM local_table_2;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1) ON UPDATE CASCADE;
|
||||
ROLLBACK;
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA fkeys_between_local_ref CASCADE;
|
|
@ -327,6 +327,7 @@ test: citus_local_tables
|
|||
test: multi_row_router_insert mixed_relkind_tests create_ref_dist_from_citus_local
|
||||
test: undistribute_table_cascade
|
||||
test: create_citus_local_table_cascade
|
||||
test: fkeys_between_local_ref
|
||||
|
||||
test: remove_coordinator
|
||||
|
||||
|
|
|
@ -313,9 +313,6 @@ CREATE UNIQUE INDEX uniqueIndex2 ON "LocalTabLE.1!?!"(id);
|
|||
|
||||
SET search_path TO citus_local_tables_test_schema;
|
||||
|
||||
-- any foreign key between citus local tables and other tables except reference tables cannot be set
|
||||
-- more tests at ref_citus_local_fkeys.sql
|
||||
|
||||
-- between citus local tables and distributed tables
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_dist FOREIGN KEY(a) references distributed_table(a);
|
||||
ALTER TABLE distributed_table ADD CONSTRAINT fkey_dist_to_c FOREIGN KEY(a) references citus_local_table_1(a);
|
||||
|
|
|
@ -0,0 +1,179 @@
|
|||
\set VERBOSITY terse
|
||||
|
||||
SET citus.next_shard_id TO 1518000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE SCHEMA fkeys_between_local_ref;
|
||||
SET search_path TO fkeys_between_local_ref;
|
||||
|
||||
SET client_min_messages to ERROR;
|
||||
|
||||
-- create a view for testing
|
||||
CREATE VIEW citus_local_tables_in_schema AS
|
||||
SELECT logicalrelid FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='fkeys_between_local_ref' AND
|
||||
partmethod = 'n' AND repmodel = 'c';
|
||||
|
||||
|
||||
-- remove coordinator if it is added to pg_dist_node and test
|
||||
-- behavior when coordinator is not added to metadata
|
||||
SELECT COUNT(master_remove_node(nodename, nodeport)) < 2
|
||||
FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:master_port;
|
||||
|
||||
create table ref (a int primary key);
|
||||
select create_reference_table('ref');
|
||||
-- creating local table that references to reference table is supported
|
||||
create table other (x int primary key, y int);
|
||||
|
||||
-- creating reference table from a local table that references
|
||||
-- to reference table is supported
|
||||
alter table other add constraint fk foreign key (y) references ref (a) on delete cascade;
|
||||
select create_reference_table('other');
|
||||
|
||||
drop table if exists ref, ref2 cascade;
|
||||
|
||||
create table ref (a int primary key);
|
||||
create table ref2 (x int);
|
||||
alter table ref2 add constraint fk foreign key (x) references ref (a);
|
||||
select create_reference_table('ref');
|
||||
-- we can also define more foreign keys after creating reference
|
||||
-- table from referenced table
|
||||
alter table ref2 add constraint fk2 foreign key (x) references ref (a);
|
||||
-- then we can create reference table from referencing table
|
||||
select create_reference_table('ref2');
|
||||
|
||||
drop table if exists ref, ref2, other cascade;
|
||||
|
||||
|
||||
-- add coordinator to pg_dist_node for rest of the tests
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
|
||||
CREATE TABLE local_table_1 (col_1 INT UNIQUE);
|
||||
CREATE TABLE local_table_2 (col_1 INT UNIQUE);
|
||||
CREATE TABLE local_table_3 (col_1 INT UNIQUE);
|
||||
CREATE TABLE local_table_4 (col_1 INT UNIQUE);
|
||||
INSERT INTO local_table_1 SELECT i FROM generate_series(195, 205) i;
|
||||
INSERT INTO local_table_2 SELECT i FROM generate_series(195, 205) i;
|
||||
INSERT INTO local_table_3 SELECT i FROM generate_series(195, 205) i;
|
||||
INSERT INTO local_table_4 SELECT i FROM generate_series(195, 205) i;
|
||||
|
||||
-- _
|
||||
-- | |
|
||||
-- | v
|
||||
-- local_table_2 -> local_table_1 -> local_table_4
|
||||
-- ^ | |
|
||||
-- | v |
|
||||
-- local_table_3 <--------
|
||||
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
ALTER TABLE local_table_3 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES local_table_3(col_1);
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES local_table_4(col_1);
|
||||
ALTER TABLE local_table_4 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES local_table_3(col_1);
|
||||
ALTER TABLE local_table_4 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES local_table_4(col_1);
|
||||
|
||||
CREATE TABLE reference_table_1(col_1 INT UNIQUE, col_2 INT);
|
||||
INSERT INTO reference_table_1 SELECT i FROM generate_series(195, 205) i;
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
INSERT INTO partitioned_table_1 SELECT i FROM generate_series(195, 205) i;
|
||||
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES local_table_4(col_1);
|
||||
|
||||
-- now that we attached partitioned table to graph below errors out
|
||||
-- since we cannot create citus local table from partitioned tables
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
|
||||
ALTER TABLE partitioned_table_1 DROP CONSTRAINT fkey_8;
|
||||
|
||||
BEGIN;
|
||||
-- now that we detached partitioned table from graph, succeeds
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
||||
|
||||
-- show that we converted all 4 local tables in this schema to citus local tables
|
||||
SELECT COUNT(*)=4 FROM citus_local_tables_in_schema;
|
||||
ROLLBACK;
|
||||
|
||||
-- this actually attempts to convert local tables to citus local tables but errors out
|
||||
-- as citus doesn't support defining foreign keys via add column commands
|
||||
ALTER TABLE local_table_1 ADD COLUMN col_3 INT REFERENCES reference_table_1(col_1);
|
||||
|
||||
BEGIN;
|
||||
-- define a foreign key so that all 4 local tables become citus local tables
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
|
||||
CREATE TABLE local_table_5 (col_1 INT UNIQUE);
|
||||
-- now define foreign key from local to citus local table
|
||||
ALTER TABLE local_table_5 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_1) REFERENCES local_table_2(col_1);
|
||||
|
||||
-- now we have 5 citus local tables in this schema
|
||||
SELECT COUNT(*)=5 FROM citus_local_tables_in_schema;
|
||||
ROLLBACK;
|
||||
|
||||
-- they fail as local_table_99 does not exist
|
||||
ALTER TABLE local_table_99 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES local_table_99(col_1);
|
||||
|
||||
-- they fail as col_99 does not exist
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_99) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES reference_table_1(col_99);
|
||||
|
||||
-- fails as col_2 does not have a unique/primary key constraint
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
|
||||
CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT);
|
||||
INSERT INTO reference_table_2 SELECT i FROM generate_series(195, 205) i;
|
||||
|
||||
BEGIN;
|
||||
-- define foreign key when both ends are local tables
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1);
|
||||
|
||||
-- we still don't convert local tables to citus local tables when
|
||||
-- creating reference tables
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
|
||||
-- now defining another foreign key would convert local tables to
|
||||
-- citus local tables as reference_table_2 is now a reference table
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1);
|
||||
|
||||
-- now print metadata to show that everyting is fine
|
||||
SELECT logicalrelid::text AS tablename, partmethod, repmodel FROM pg_dist_partition
|
||||
WHERE logicalrelid::text IN (SELECT tablename FROM pg_tables WHERE schemaname='fkeys_between_local_ref')
|
||||
ORDER BY tablename;
|
||||
ROLLBACK;
|
||||
|
||||
-- we don't support foreign keys from citus local to reference tables
|
||||
-- with ON DELETE/UPDATE CASCADE behavior, so below two errors out
|
||||
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES local_table_2(col_1) ON DELETE CASCADE;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_local_to_ref FOREIGN KEY (col_1) REFERENCES local_table_2(col_1) ON UPDATE CASCADE;
|
||||
ROLLBACK;
|
||||
|
||||
-- but we support such foreign key behaviors when foreign key is from
|
||||
-- citus local to reference table
|
||||
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1) ON DELETE CASCADE;
|
||||
|
||||
DELETE FROM reference_table_2 WHERE col_1=200;
|
||||
-- we deleted one row as DELETE cascades, so we should have 10 rows
|
||||
SELECT COUNT(*) FROM local_table_2;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES reference_table_2(col_1) ON UPDATE CASCADE;
|
||||
ROLLBACK;
|
||||
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA fkeys_between_local_ref CASCADE;
|
Loading…
Reference in New Issue