diff --git a/Makefile b/Makefile index c251f6ce3..2eb5fe342 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/connection/placement_connection.o \ src/backend/distributed/connection/remote_commands.o \ src/backend/distributed/ddl/foreign_constraint.o \ + src/backend/distributed/ddl/policy.o \ src/backend/distributed/executor/citus_custom_scan.o \ src/backend/distributed/executor/insert_select_executor.o \ src/backend/distributed/executor/intermediate_results.o \ diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 17f741190..55b5ae30b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -46,6 +46,7 @@ #include "distributed/multi_utility.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" +#include "distributed/policy.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" @@ -765,6 +766,8 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, InvalidateForeignKeyGraph(); } + ErrorIfUnsupportedPolicy(relation); + relation_close(relation, NoLock); } diff --git a/src/backend/distributed/ddl/policy.c b/src/backend/distributed/ddl/policy.c new file mode 100644 index 000000000..3d523aeeb --- /dev/null +++ b/src/backend/distributed/ddl/policy.c @@ -0,0 +1,127 @@ +/*------------------------------------------------------------------------- + * policy.c + * + * This file contains functions to create, alter and drop policies on + * distributed tables. + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/namespace.h" +#include "commands/policy.h" +#include "distributed/metadata_cache.h" +#include "distributed/policy.h" +#include "utils/builtins.h" + + +/* placeholder for CreatePolicyCommands */ +List * +CreatePolicyCommands(Oid relationId) +{ + /* placeholder for future implementation */ + return NIL; +} + + +/* placeholder for PlanCreatePolicyStmt */ +List * +PlanCreatePolicyStmt(CreatePolicyStmt *stmt) +{ + Oid relationId = RangeVarGetRelid(stmt->table, + AccessExclusiveLock, + false); + if (IsDistributedTable(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("policies on distributed tables are only supported in " + "Citus Enterprise"))); + } + + /* placeholder for future implementation */ + return NIL; +} + + +/* placeholder for PlanAlterPolicyStmt */ +List * +PlanAlterPolicyStmt(AlterPolicyStmt *stmt) +{ + /* placeholder for future implementation */ + return NIL; +} + + +/* placeholder for ErrorIfUnsupportedPolicy */ +void +ErrorIfUnsupportedPolicy(Relation relation) +{ + if (relation_has_policies(relation)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("policies on distributed tables are only supported in " + "Citus Enterprise"), + errhint("Remove any policies on a table before distributing"))); + } +} + + +/* placeholder for ErrorIfUnsupportedPolicyExpr */ +void +ErrorIfUnsupportedPolicyExpr(Node *expr) +{ + /* placeholder for future implementation */ +} + + +/* placeholder for PlanDropPolicyStmt */ +List * +PlanDropPolicyStmt(DropStmt *stmt, const char *queryString) +{ + /* placeholder for future implementation */ + return NIL; +} + + +/* placeholder for IsPolicyRenameStmt */ +bool +IsPolicyRenameStmt(RenameStmt *stmt) +{ + /* placeholder for future implementation */ + return false; +} + + +/* placeholder for CreatePolicyEventExtendNames */ +void +CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName, uint64 + shardId) +{ + /* placeholder for future implementation */ +} + + +/* placeholder for AlterPolicyEventExtendNames */ +void +AlterPolicyEventExtendNames(AlterPolicyStmt *stmt, const char *schemaName, uint64 shardId) +{ + /* placeholder for future implementation */ +} + + +/* placeholder for RenamePolicyEventExtendNames */ +void +RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64 shardId) +{ + /* placeholder for future implementation */ +} + + +/* placeholder for DropPolicyEventExtendNames */ +void +DropPolicyEventExtendNames(DropStmt *dropStmt, const char *schemaName, uint64 shardId) +{ + /* placeholder for future implementation */ +} diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 209b0458b..848524f93 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -49,6 +49,7 @@ #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/pg_dist_partition.h" +#include "distributed/policy.h" #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" @@ -157,7 +158,6 @@ static bool IsIndexRenameStmt(RenameStmt *renameStmt); static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement, AlterTableCmd *command); static void ExecuteDistributedDDLJob(DDLJob *ddlJob); -static List * DDLTaskList(Oid relationId, const char *commandString); static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, @@ -386,6 +386,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, { ProcessDropSchemaStmt(dropStatement); } + + if (dropStatement->removeType == OBJECT_POLICY) + { + ddlJobs = PlanDropPolicyStmt(dropStatement, queryString); + } } if (IsA(parsetree, AlterTableStmt)) @@ -417,6 +422,16 @@ multi_ProcessUtility(PlannedStmt *pstmt, ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString); } + if (IsA(parsetree, CreatePolicyStmt)) + { + ddlJobs = PlanCreatePolicyStmt((CreatePolicyStmt *) parsetree); + } + + if (IsA(parsetree, AlterPolicyStmt)) + { + ddlJobs = PlanAlterPolicyStmt((AlterPolicyStmt *) parsetree); + } + /* * ALTER TABLE ALL IN TABLESPACE statements have their node type as * AlterTableMoveAllStmt. At the moment we do not support this functionality in @@ -1490,7 +1505,8 @@ PlanRenameStmt(RenameStmt *renameStmt, const char *renameCommand) * our list include only renaming table and index (related) objects. */ if (!IsAlterTableRenameStmt(renameStmt) && - !IsIndexRenameStmt(renameStmt)) + !IsIndexRenameStmt(renameStmt) && + !IsPolicyRenameStmt(renameStmt)) { return NIL; } @@ -1519,6 +1535,7 @@ PlanRenameStmt(RenameStmt *renameStmt, const char *renameCommand) case OBJECT_TABLE: case OBJECT_COLUMN: case OBJECT_TABCONSTRAINT: + case OBJECT_POLICY: { /* the target object is our tableRelationId. */ tableRelationId = objectRelationId; @@ -3026,7 +3043,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) * DDLTaskList builds a list of tasks to execute a DDL command on a * given list of shards. */ -static List * +List * DDLTaskList(Oid relationId, const char *commandString) { List *taskList = NIL; diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 4b36f6d9f..acff6fac4 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -46,6 +46,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/pg_dist_shard.h" +#include "distributed/policy.h" #include "distributed/worker_manager.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" @@ -558,6 +559,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) List *tableCreationCommandList = NIL; List *indexAndConstraintCommandList = NIL; List *replicaIdentityEvents = NIL; + List *policyCommands = NIL; tableCreationCommandList = GetTableCreationCommands(relationId, includeSequenceDefaults); @@ -569,6 +571,9 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId); tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents); + policyCommands = CreatePolicyCommands(relationId); + tableDDLEventList = list_concat(tableDDLEventList, policyCommands); + return tableDDLEventList; } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 1a5eb0d6c..6a3e659cd 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -30,6 +30,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_constraint.h" #include "distributed/metadata_cache.h" +#include "distributed/policy.h" #include "distributed/relay_utility.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" @@ -51,7 +52,6 @@ /* Local functions forward declarations */ static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId); -static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName); static bool UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId); /* exports for SQL callable functions */ @@ -277,6 +277,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) relationName = &(relationNameValue->val.str); AppendShardIdToName(relationName, shardId); } + else if (objectType == OBJECT_POLICY) + { + DropPolicyEventExtendNames(dropStmt, schemaName, shardId); + } else { ereport(WARNING, (errmsg("unsafe object type in drop statement"), @@ -310,6 +314,20 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) break; } + case T_CreatePolicyStmt: + { + CreatePolicyEventExtendNames((CreatePolicyStmt *) parseTree, schemaName, + shardId); + break; + } + + case T_AlterPolicyStmt: + { + AlterPolicyEventExtendNames((AlterPolicyStmt *) parseTree, schemaName, + shardId); + break; + } + case T_IndexStmt: { IndexStmt *indexStmt = (IndexStmt *) parseTree; @@ -430,6 +448,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) AppendShardIdToName(relationName, shardId); } + else if (objectType == OBJECT_POLICY) + { + RenamePolicyEventExtendNames(renameStmt, schemaName, shardId); + } else { ereport(WARNING, (errmsg("unsafe object type in rename statement"), @@ -623,8 +645,8 @@ UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId) * SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set * it sets its value to given newSchemaName. */ -static void -SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName) +void +SetSchemaNameIfNotExist(char **schemaName, const char *newSchemaName) { if ((*schemaName) == NULL) { diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 45c38252b..a10174fbb 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -51,6 +51,7 @@ extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMet extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS); +extern List * DDLTaskList(Oid relationId, const char *commandString); extern const char * RoleSpecString(RoleSpec *spec); #endif /* MULTI_UTILITY_H */ diff --git a/src/include/distributed/policy.h b/src/include/distributed/policy.h new file mode 100644 index 000000000..65ac6c18b --- /dev/null +++ b/src/include/distributed/policy.h @@ -0,0 +1,32 @@ +/*------------------------------------------------------------------------- + * policy.h + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_POLICY_H +#define CITUS_POLICY_H + +#include "utils/rel.h" + +extern List * CreatePolicyCommands(Oid relationId); +extern void ErrorIfUnsupportedPolicy(Relation relation); +extern void ErrorIfUnsupportedPolicyExpr(Node *expr); + +extern List * PlanCreatePolicyStmt(CreatePolicyStmt *stmt); +extern List * PlanAlterPolicyStmt(AlterPolicyStmt *stmt); +extern List * PlanDropPolicyStmt(DropStmt *stmt, const char *queryString); +extern bool IsPolicyRenameStmt(RenameStmt *stmt); + +extern void CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName, + uint64 shardId); +extern void AlterPolicyEventExtendNames(AlterPolicyStmt *stmt, const char *schemaName, + uint64 shardId); +extern void RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64 + shardId); +extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, uint64 + shardId); + +#endif /*CITUS_POLICY_H */ diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index 3eda6fbd7..8919a307a 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -49,4 +49,6 @@ extern void RelayEventExtendNamesForInterShardCommands(Node *parseTree, char *rightShardSchemaName); extern void AppendShardIdToName(char **name, uint64 shardId); +extern void SetSchemaNameIfNotExist(char **schemaName, const char *newSchemaName); + #endif /* RELAY_UTILITY_H */