create placeholder for policy ddl

pull/2254/head
Nils Dijk 2018-07-04 16:34:21 +02:00
parent 19e92ee369
commit c1c8c38dc9
9 changed files with 216 additions and 6 deletions

View File

@ -14,6 +14,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/connection/placement_connection.o \
src/backend/distributed/connection/remote_commands.o \
src/backend/distributed/ddl/foreign_constraint.o \
src/backend/distributed/ddl/policy.o \
src/backend/distributed/executor/citus_custom_scan.o \
src/backend/distributed/executor/insert_select_executor.o \
src/backend/distributed/executor/intermediate_results.o \

View File

@ -46,6 +46,7 @@
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/policy.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
@ -765,6 +766,8 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
InvalidateForeignKeyGraph();
}
ErrorIfUnsupportedPolicy(relation);
relation_close(relation, NoLock);
}

View File

@ -0,0 +1,127 @@
/*-------------------------------------------------------------------------
* policy.c
*
* This file contains functions to create, alter and drop policies on
* distributed tables.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "commands/policy.h"
#include "distributed/metadata_cache.h"
#include "distributed/policy.h"
#include "utils/builtins.h"
/* placeholder for CreatePolicyCommands */
List *
CreatePolicyCommands(Oid relationId)
{
/* placeholder for future implementation */
return NIL;
}
/* placeholder for PlanCreatePolicyStmt */
List *
PlanCreatePolicyStmt(CreatePolicyStmt *stmt)
{
Oid relationId = RangeVarGetRelid(stmt->table,
AccessExclusiveLock,
false);
if (IsDistributedTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("policies on distributed tables are only supported in "
"Citus Enterprise")));
}
/* placeholder for future implementation */
return NIL;
}
/* placeholder for PlanAlterPolicyStmt */
List *
PlanAlterPolicyStmt(AlterPolicyStmt *stmt)
{
/* placeholder for future implementation */
return NIL;
}
/* placeholder for ErrorIfUnsupportedPolicy */
void
ErrorIfUnsupportedPolicy(Relation relation)
{
if (relation_has_policies(relation))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("policies on distributed tables are only supported in "
"Citus Enterprise"),
errhint("Remove any policies on a table before distributing")));
}
}
/* placeholder for ErrorIfUnsupportedPolicyExpr */
void
ErrorIfUnsupportedPolicyExpr(Node *expr)
{
/* placeholder for future implementation */
}
/* placeholder for PlanDropPolicyStmt */
List *
PlanDropPolicyStmt(DropStmt *stmt, const char *queryString)
{
/* placeholder for future implementation */
return NIL;
}
/* placeholder for IsPolicyRenameStmt */
bool
IsPolicyRenameStmt(RenameStmt *stmt)
{
/* placeholder for future implementation */
return false;
}
/* placeholder for CreatePolicyEventExtendNames */
void
CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName, uint64
shardId)
{
/* placeholder for future implementation */
}
/* placeholder for AlterPolicyEventExtendNames */
void
AlterPolicyEventExtendNames(AlterPolicyStmt *stmt, const char *schemaName, uint64 shardId)
{
/* placeholder for future implementation */
}
/* placeholder for RenamePolicyEventExtendNames */
void
RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64 shardId)
{
/* placeholder for future implementation */
}
/* placeholder for DropPolicyEventExtendNames */
void
DropPolicyEventExtendNames(DropStmt *dropStmt, const char *schemaName, uint64 shardId)
{
/* placeholder for future implementation */
}

View File

@ -49,6 +49,7 @@
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
#include "distributed/pg_dist_partition.h"
#include "distributed/policy.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
@ -157,7 +158,6 @@ static bool IsIndexRenameStmt(RenameStmt *renameStmt);
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *command);
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static List * DDLTaskList(Oid relationId, const char *commandString);
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
@ -386,6 +386,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
{
ProcessDropSchemaStmt(dropStatement);
}
if (dropStatement->removeType == OBJECT_POLICY)
{
ddlJobs = PlanDropPolicyStmt(dropStatement, queryString);
}
}
if (IsA(parsetree, AlterTableStmt))
@ -417,6 +422,16 @@ multi_ProcessUtility(PlannedStmt *pstmt,
ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString);
}
if (IsA(parsetree, CreatePolicyStmt))
{
ddlJobs = PlanCreatePolicyStmt((CreatePolicyStmt *) parsetree);
}
if (IsA(parsetree, AlterPolicyStmt))
{
ddlJobs = PlanAlterPolicyStmt((AlterPolicyStmt *) parsetree);
}
/*
* ALTER TABLE ALL IN TABLESPACE statements have their node type as
* AlterTableMoveAllStmt. At the moment we do not support this functionality in
@ -1490,7 +1505,8 @@ PlanRenameStmt(RenameStmt *renameStmt, const char *renameCommand)
* our list include only renaming table and index (related) objects.
*/
if (!IsAlterTableRenameStmt(renameStmt) &&
!IsIndexRenameStmt(renameStmt))
!IsIndexRenameStmt(renameStmt) &&
!IsPolicyRenameStmt(renameStmt))
{
return NIL;
}
@ -1519,6 +1535,7 @@ PlanRenameStmt(RenameStmt *renameStmt, const char *renameCommand)
case OBJECT_TABLE:
case OBJECT_COLUMN:
case OBJECT_TABCONSTRAINT:
case OBJECT_POLICY:
{
/* the target object is our tableRelationId. */
tableRelationId = objectRelationId;
@ -3026,7 +3043,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
* DDLTaskList builds a list of tasks to execute a DDL command on a
* given list of shards.
*/
static List *
List *
DDLTaskList(Oid relationId, const char *commandString)
{
List *taskList = NIL;

View File

@ -46,6 +46,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/policy.h"
#include "distributed/worker_manager.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
@ -558,6 +559,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
List *tableCreationCommandList = NIL;
List *indexAndConstraintCommandList = NIL;
List *replicaIdentityEvents = NIL;
List *policyCommands = NIL;
tableCreationCommandList = GetTableCreationCommands(relationId,
includeSequenceDefaults);
@ -569,6 +571,9 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId);
tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents);
policyCommands = CreatePolicyCommands(relationId);
tableDDLEventList = list_concat(tableDDLEventList, policyCommands);
return tableDDLEventList;
}

View File

@ -30,6 +30,7 @@
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "distributed/metadata_cache.h"
#include "distributed/policy.h"
#include "distributed/relay_utility.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
@ -51,7 +52,6 @@
/* Local functions forward declarations */
static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId);
static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
static bool UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId);
/* exports for SQL callable functions */
@ -277,6 +277,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
relationName = &(relationNameValue->val.str);
AppendShardIdToName(relationName, shardId);
}
else if (objectType == OBJECT_POLICY)
{
DropPolicyEventExtendNames(dropStmt, schemaName, shardId);
}
else
{
ereport(WARNING, (errmsg("unsafe object type in drop statement"),
@ -310,6 +314,20 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
break;
}
case T_CreatePolicyStmt:
{
CreatePolicyEventExtendNames((CreatePolicyStmt *) parseTree, schemaName,
shardId);
break;
}
case T_AlterPolicyStmt:
{
AlterPolicyEventExtendNames((AlterPolicyStmt *) parseTree, schemaName,
shardId);
break;
}
case T_IndexStmt:
{
IndexStmt *indexStmt = (IndexStmt *) parseTree;
@ -430,6 +448,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
AppendShardIdToName(relationName, shardId);
}
else if (objectType == OBJECT_POLICY)
{
RenamePolicyEventExtendNames(renameStmt, schemaName, shardId);
}
else
{
ereport(WARNING, (errmsg("unsafe object type in rename statement"),
@ -623,8 +645,8 @@ UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId)
* SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set
* it sets its value to given newSchemaName.
*/
static void
SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName)
void
SetSchemaNameIfNotExist(char **schemaName, const char *newSchemaName)
{
if ((*schemaName) == NULL)
{

View File

@ -51,6 +51,7 @@ extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMet
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern const char * RoleSpecString(RoleSpec *spec);
#endif /* MULTI_UTILITY_H */

View File

@ -0,0 +1,32 @@
/*-------------------------------------------------------------------------
* policy.h
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_POLICY_H
#define CITUS_POLICY_H
#include "utils/rel.h"
extern List * CreatePolicyCommands(Oid relationId);
extern void ErrorIfUnsupportedPolicy(Relation relation);
extern void ErrorIfUnsupportedPolicyExpr(Node *expr);
extern List * PlanCreatePolicyStmt(CreatePolicyStmt *stmt);
extern List * PlanAlterPolicyStmt(AlterPolicyStmt *stmt);
extern List * PlanDropPolicyStmt(DropStmt *stmt, const char *queryString);
extern bool IsPolicyRenameStmt(RenameStmt *stmt);
extern void CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName,
uint64 shardId);
extern void AlterPolicyEventExtendNames(AlterPolicyStmt *stmt, const char *schemaName,
uint64 shardId);
extern void RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64
shardId);
extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, uint64
shardId);
#endif /*CITUS_POLICY_H */

View File

@ -49,4 +49,6 @@ extern void RelayEventExtendNamesForInterShardCommands(Node *parseTree,
char *rightShardSchemaName);
extern void AppendShardIdToName(char **name, uint64 shardId);
extern void SetSchemaNameIfNotExist(char **schemaName, const char *newSchemaName);
#endif /* RELAY_UTILITY_H */