Merge pull request #5764 from citusdata/feature/domain-type

Feature: propagate DOMAIN objects
pull/5893/head
Nils Dijk 2022-04-08 16:14:18 +02:00 committed by GitHub
commit 31493288de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 3294 additions and 179 deletions

View File

@ -358,6 +358,15 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return CreateCollationDDLsIdempotent(dependency->objectId);
}
case OCLASS_CONSTRAINT:
{
/*
* Constraints can only be reached by domains, they resolve functions.
* Constraints themself are recreated by the domain recreation.
*/
return NIL;
}
case OCLASS_DATABASE:
{
List *databaseDDLCommands = NIL;

View File

@ -153,6 +153,14 @@ static DistributeObjectOps Any_CompositeType = {
.address = CompositeTypeStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_CreateDomain = {
.deparse = DeparseCreateDomainStmt,
.qualify = QualifyCreateDomainStmt,
.preprocess = PreprocessCreateDomainStmt,
.postprocess = PostprocessCreateDomainStmt,
.address = CreateDomainStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_CreateEnum = {
.deparse = DeparseCreateEnumStmt,
.qualify = QualifyCreateEnumStmt,
@ -305,6 +313,55 @@ static DistributeObjectOps Database_AlterOwner = {
.address = AlterDatabaseOwnerObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Domain_Alter = {
.deparse = DeparseAlterDomainStmt,
.qualify = QualifyAlterDomainStmt,
.preprocess = PreprocessAlterDomainStmt,
.postprocess = PostprocessAlterDomainStmt,
.address = AlterDomainStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Domain_AlterObjectSchema = {
.deparse = DeparseAlterDomainSchemaStmt,
.qualify = QualifyAlterDomainSchemaStmt,
.preprocess = PreprocessAlterDomainSchemaStmt,
.postprocess = PostprocessAlterDomainSchemaStmt,
.address = AlterTypeSchemaStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Domain_AlterOwner = {
.deparse = DeparseAlterDomainOwnerStmt,
.qualify = QualifyAlterDomainOwnerStmt,
.preprocess = PreprocessAlterDomainOwnerStmt,
.postprocess = PostprocessAlterDomainOwnerStmt,
.address = AlterDomainOwnerStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Domain_Drop = {
.deparse = DeparseDropDomainStmt,
.qualify = QualifyDropDomainStmt,
.preprocess = PreprocessDropDomainStmt,
.postprocess = NULL,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Domain_Rename = {
.deparse = DeparseRenameDomainStmt,
.qualify = QualifyRenameDomainStmt,
.preprocess = PreprocessRenameDomainStmt,
.postprocess = NULL,
.address = RenameDomainStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Domain_RenameConstraint = {
.deparse = DeparseDomainRenameConstraintStmt,
.qualify = QualifyDomainRenameConstraintStmt,
.preprocess = PreprocessDomainRenameConstraintStmt,
.postprocess = NULL,
.address = DomainRenameConstraintStmtObjectAddress,
.markDistributed = false,
};
static DistributeObjectOps Extension_AlterObjectSchema = {
.deparse = DeparseAlterExtensionSchemaStmt,
.qualify = NULL,
@ -815,6 +872,11 @@ GetDistributeObjectOps(Node *node)
{
switch (nodeTag(node))
{
case T_AlterDomainStmt:
{
return &Domain_Alter;
}
case T_AlterEnumStmt:
{
return &Any_AlterEnum;
@ -887,6 +949,11 @@ GetDistributeObjectOps(Node *node)
return &Collation_AlterObjectSchema;
}
case OBJECT_DOMAIN:
{
return &Domain_AlterObjectSchema;
}
case OBJECT_EXTENSION:
{
return &Extension_AlterObjectSchema;
@ -965,6 +1032,11 @@ GetDistributeObjectOps(Node *node)
return &Database_AlterOwner;
}
case OBJECT_DOMAIN:
{
return &Domain_AlterOwner;
}
case OBJECT_FOREIGN_SERVER:
{
return &ForeignServer_AlterOwner;
@ -1123,6 +1195,11 @@ GetDistributeObjectOps(Node *node)
return &Any_CompositeType;
}
case T_CreateDomainStmt:
{
return &Any_CreateDomain;
}
case T_CreateEnumStmt:
{
return &Any_CreateEnum;
@ -1210,6 +1287,11 @@ GetDistributeObjectOps(Node *node)
return &Collation_Drop;
}
case OBJECT_DOMAIN:
{
return &Domain_Drop;
}
case OBJECT_EXTENSION:
{
return &Extension_Drop;
@ -1339,6 +1421,16 @@ GetDistributeObjectOps(Node *node)
return &Collation_Rename;
}
case OBJECT_DOMAIN:
{
return &Domain_Rename;
}
case OBJECT_DOMCONSTRAINT:
{
return &Domain_RenameConstraint;
}
case OBJECT_FOREIGN_SERVER:
{
return &ForeignServer_Rename;

View File

@ -0,0 +1,702 @@
/*-------------------------------------------------------------------------
*
* domain.c
* Hooks to handle the creation, altering and removal of domains.
* These hooks are responsible for duplicating the changes to the
* workers nodes.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "parser/parse_type.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/regproc.h"
#include "utils/syscache.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_create_or_replace.h"
#include "distributed/worker_transaction.h"
static CollateClause * MakeCollateClauseFromOid(Oid collationOid);
static List * FilterNameListForDistributedDomains(List *domainNames, bool missing_ok,
List **distributedDomainAddresses);
static ObjectAddress GetDomainAddressByName(TypeName *domainName, bool missing_ok);
/*
* PreprocessCreateDomainStmt handles the propagation of the create domain statements.
*/
List *
PreprocessCreateDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode(node);
const char *sql = DeparseTreeNode(node);
sql = WrapCreateOrReplace(sql);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessCreateDomainStmt gets called after the domain has been created locally. When
* the domain is decided to be propagated we make sure all the domains dependencies exist
* on all workers.
*/
List *
PostprocessCreateDomainStmt(Node *node, const char *queryString)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
/*
* find object address of the just created object, because the domain has been created
* locally it can't be missing
*/
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
EnsureDependenciesExistOnAllNodes(&typeAddress);
return NIL;
}
/*
* PreprocessDropDomainStmt gets called before dropping the domain locally. For
* distributed domains it will make sure the fully qualified statement is forwarded to all
* the workers reflecting the drop of the domain.
*/
List *
PreprocessDropDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
if (!ShouldPropagate())
{
return NIL;
}
QualifyTreeNode((Node *) stmt);
List *oldDomains = stmt->objects;
List *distributedDomainAddresses = NIL;
List *distributedDomains = FilterNameListForDistributedDomains(
oldDomains,
stmt->missing_ok,
&distributedDomainAddresses);
if (list_length(distributedDomains) <= 0)
{
/* no distributed domains to drop */
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
ObjectAddress *addressItem = NULL;
foreach_ptr(addressItem, distributedDomainAddresses)
{
UnmarkObjectDistributed(addressItem);
}
/*
* temporary swap the lists of objects to delete with the distributed objects and
* deparse to an executable sql statement for the workers
*/
stmt->objects = distributedDomains;
char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = oldDomains;
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDomainStmt gets called for all domain specific alter statements. When
* the change happens on a distributed domain we reflect the changes on the workers.
*/
List *
PreprocessAlterDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDomainStmt gets called after the domain has been altered locally. A
* change on the constraints could cause new (undistributed) objects to be dependencies of
* the domain. Here we recreate any new dependencies on the workers before we forward the
* alter statement to the workers.
*/
List *
PostprocessAlterDomainStmt(Node *node, const char *queryString)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&domainAddress);
return NIL;
}
/*
* PreprocessDomainRenameConstraintStmt gets called locally when a constraint on a domain
* is renamed. When the constraint is on a distributed domain we forward the statement
* appropriately.
*/
List *
PreprocessDomainRenameConstraintStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMCONSTRAINT);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDomainOwnerStmt called locally when the owner of a constraint is
* changed. For distributed domains the statement is forwarded to all the workers.
*/
List *
PreprocessAlterDomainOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDomainOwnerStmt given the change of ownership could cause new
* dependencies to exist for the domain we make sure all dependencies for the domain are
* created before we forward the statement to the workers.
*/
List *
PostprocessAlterDomainOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&domainAddress);
return NIL;
}
/*
* PreprocessRenameDomainStmt creates the statements to execute on the workers when the
* domain being renamed is distributed.
*/
List *
PreprocessRenameDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMAIN);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDomainSchemaStmt cretes the statements to execute on the workers when
* the domain being moved to a new schema has been distributed.
*/
List *
PreprocessAlterDomainSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDomainSchemaStmt makes sure any new dependencies (as result of the
* schema move) are created on the workers before we forward the statement.
*/
List *
PostprocessAlterDomainSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&domainAddress);
return NIL;
}
/*
* FilterNameListForDistributedDomains given a liost of domain names we will return a list
* filtered with only the names of distributed domains remaining. The pointer to the list
* distributedDomainAddresses is populated with a list of ObjectAddresses of the domains
* that are distributed. Indices between the returned list and the object addresses are
* synchronizes.
* Note: the pointer in distributedDomainAddresses should not be omitted
*
* When missing_ok is false this function will raise an error if a domain identified by a
* domain name cannot be found.
*/
static List *
FilterNameListForDistributedDomains(List *domainNames, bool missing_ok,
List **distributedDomainAddresses)
{
Assert(distributedDomainAddresses != NULL);
List *distributedDomainNames = NIL;
TypeName *domainName = NULL;
foreach_ptr(domainName, domainNames)
{
ObjectAddress objectAddress = GetDomainAddressByName(domainName, missing_ok);
if (IsObjectDistributed(&objectAddress))
{
distributedDomainNames = lappend(distributedDomainNames, domainName);
if (distributedDomainAddresses)
{
ObjectAddress *allocatedAddress = palloc0(sizeof(ObjectAddress));
*allocatedAddress = objectAddress;
*distributedDomainAddresses = lappend(*distributedDomainAddresses,
allocatedAddress);
}
}
}
return distributedDomainNames;
}
/*
* GetDomainAddressByName returns the ObjectAddress of the domain identified by
* domainName. When missing_ok is true the object id part of the ObjectAddress can be
* InvalidOid. When missing_ok is false this function will raise an error instead when the
* domain can't be found.
*/
static ObjectAddress
GetDomainAddressByName(TypeName *domainName, bool missing_ok)
{
ObjectAddress address = { 0 };
Oid domainOid = LookupTypeNameOid(NULL, domainName, missing_ok);
ObjectAddressSet(address, TypeRelationId, domainOid);
return address;
}
/*
* RecreateDomainStmt returns a CreateDomainStmt pointer where the statement represents
* the creation of the domain to recreate the domain on a different postgres node based on
* the current representation in the local catalog.
*/
CreateDomainStmt *
RecreateDomainStmt(Oid domainOid)
{
CreateDomainStmt *stmt = makeNode(CreateDomainStmt);
stmt->domainname = stringToQualifiedNameList(format_type_be_qualified(domainOid));
HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainOid));
if (!HeapTupleIsValid(tup))
{
elog(ERROR, "cache lookup failed for type %u", domainOid);
}
Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup);
if (typTup->typtype != TYPTYPE_DOMAIN)
{
elog(ERROR, "type is not a domain type");
}
stmt->typeName = makeTypeNameFromOid(typTup->typbasetype, typTup->typtypmod);
if (OidIsValid(typTup->typcollation))
{
stmt->collClause = MakeCollateClauseFromOid(typTup->typcollation);
}
/*
* typdefault and typdefaultbin are potentially null, so don't try to
* access 'em as struct fields. Must do it the hard way with
* SysCacheGetAttr.
*/
bool isNull = false;
Datum typeDefaultDatum = SysCacheGetAttr(TYPEOID,
tup,
Anum_pg_type_typdefaultbin,
&isNull);
if (!isNull)
{
/* when not null there is default value which we should add as a constraint */
Constraint *constraint = makeNode(Constraint);
constraint->contype = CONSTR_DEFAULT;
constraint->cooked_expr = TextDatumGetCString(typeDefaultDatum);
stmt->constraints = lappend(stmt->constraints, constraint);
}
/* NOT NULL constraints are non-named on the actual type */
if (typTup->typnotnull)
{
Constraint *constraint = makeNode(Constraint);
constraint->contype = CONSTR_NOTNULL;
stmt->constraints = lappend(stmt->constraints, constraint);
}
/* lookup and look all constraints to add them to the CreateDomainStmt */
Relation conRel = table_open(ConstraintRelationId, AccessShareLock);
/* Look for CHECK Constraints on this domain */
ScanKeyData key[1];
ScanKeyInit(&key[0],
Anum_pg_constraint_contypid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(domainOid));
SysScanDesc scan = systable_beginscan(conRel, ConstraintTypidIndexId, true, NULL, 1,
key);
HeapTuple conTup = NULL;
while (HeapTupleIsValid(conTup = systable_getnext(scan)))
{
Form_pg_constraint c = (Form_pg_constraint) GETSTRUCT(conTup);
if (c->contype != CONSTRAINT_CHECK)
{
/* Ignore non-CHECK constraints, shouldn't be any */
continue;
}
/*
* We create a constraint, completely ignoring c->convalidated because we can't
* create a domain with an invalidated constraint. Once a constraint is added to
* a domain -even non valid-, all new data is validated. Meaning, creating a
* domain with a non-valid constraint doesn't make any sense.
*
* Given it will be too hard to defer the creation of a constraint till we
* validate the constraint on the coordinator we will simply create the
* non-validated constraint to ad hear to validating all new data.
*
* An edgecase here would be when moving existing data, that hasn't been validated
* before to an other node. This behaviour is consistent with sending it to an
* already existing node (that has the constraint created but not validated) and a
* new node.
*/
Constraint *constraint = makeNode(Constraint);
constraint->conname = pstrdup(NameStr(c->conname));
constraint->contype = CONSTR_CHECK; /* we only come here with check constraints */
/* Not expecting conbin to be NULL, but we'll test for it anyway */
Datum conbin = heap_getattr(conTup, Anum_pg_constraint_conbin, conRel->rd_att,
&isNull);
if (isNull)
{
elog(ERROR, "domain \"%s\" constraint \"%s\" has NULL conbin",
NameStr(typTup->typname), NameStr(c->conname));
}
/*
* The conbin containes the cooked expression from when the constraint was
* inserted into the catalog. We store it here for the deparser to distinguish
* between cooked expressions and raw expressions.
*
* There is no supported way to go from a cooked expression to a raw expression.
*/
constraint->cooked_expr = TextDatumGetCString(conbin);
stmt->constraints = lappend(stmt->constraints, constraint);
}
systable_endscan(scan);
table_close(conRel, NoLock);
ReleaseSysCache(tup);
QualifyTreeNode((Node *) stmt);
return stmt;
}
/*
* MakeCollateClauseFromOid returns a CollateClause describing the COLLATE segment of a
* CREATE DOMAIN statement based on the Oid of the collation used for the domain.
*/
static CollateClause *
MakeCollateClauseFromOid(Oid collationOid)
{
CollateClause *collateClause = makeNode(CollateClause);
ObjectAddress collateAddress = { 0 };
ObjectAddressSet(collateAddress, CollationRelationId, collationOid);
List *objName = NIL;
List *objArgs = NIL;
#if PG_VERSION_NUM >= PG_VERSION_14
getObjectIdentityParts(&collateAddress, &objName, &objArgs, false);
#else
getObjectIdentityParts(&collateAddress, &objName, &objArgs);
#endif
char *name = NULL;
foreach_ptr(name, objName)
{
collateClause->collname = lappend(collateClause->collname, makeString(name));
}
collateClause->location = -1;
return collateClause;
}
/*
* CreateDomainStmtObjectAddress returns the ObjectAddress of the domain that would be
* created by the statement. When missing_ok is false the function will raise an error if
* the domain cannot be found in the local catalog.
*/
ObjectAddress
CreateDomainStmtObjectAddress(Node *node, bool missing_ok)
{
CreateDomainStmt *stmt = castNode(CreateDomainStmt, node);
TypeName *typeName = makeTypeNameFromNameList(stmt->domainname);
Oid typeOid = LookupTypeNameOid(NULL, typeName, missing_ok);
ObjectAddress address = { 0 };
ObjectAddressSet(address, TypeRelationId, typeOid);
return address;
}
/*
* AlterDomainStmtObjectAddress returns the ObjectAddress of the domain being altered.
* When missing_ok is false this function will raise an error when the domain is not
* found.
*/
ObjectAddress
AlterDomainStmtObjectAddress(Node *node, bool missing_ok)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
TypeName *domainName = makeTypeNameFromNameList(stmt->typeName);
return GetDomainAddressByName(domainName, missing_ok);
}
/*
* DomainRenameConstraintStmtObjectAddress returns the ObjectAddress of the domain for
* which the constraint is being renamed. When missing_ok this function will raise an
* error if the domain cannot be found.
*/
ObjectAddress
DomainRenameConstraintStmtObjectAddress(Node *node, bool missing_ok)
{
RenameStmt *stmt = castNode(RenameStmt, node);
TypeName *domainName = makeTypeNameFromNameList(castNode(List, stmt->object));
return GetDomainAddressByName(domainName, missing_ok);
}
/*
* AlterDomainOwnerStmtObjectAddress returns the ObjectAddress for which the owner is
* being changed. When missing_ok is false this function will raise an error if the domain
* cannot be found.
*/
ObjectAddress
AlterDomainOwnerStmtObjectAddress(Node *node, bool missing_ok)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
TypeName *domainName = makeTypeNameFromNameList(castNode(List, stmt->object));
return GetDomainAddressByName(domainName, missing_ok);
}
/*
* RenameDomainStmtObjectAddress returns the ObjectAddress of the domain being renamed.
* When missing_ok is false this function will raise an error when the domain cannot be
* found.
*/
ObjectAddress
RenameDomainStmtObjectAddress(Node *node, bool missing_ok)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMAIN);
TypeName *domainName = makeTypeNameFromNameList(castNode(List, stmt->object));
return GetDomainAddressByName(domainName, missing_ok);
}
/*
* get_constraint_typid returns the contypid of a constraint. This field is only set for
* constraints on domain types. Returns InvalidOid if conoid is an invalid constraint, as
* well as for constraints that are not on domain types.
*/
Oid
get_constraint_typid(Oid conoid)
{
HeapTuple tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
if (HeapTupleIsValid(tp))
{
Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
Oid result = contup->contypid;
ReleaseSysCache(tp);
return result;
}
else
{
return InvalidOid;
}
}

View File

@ -612,6 +612,11 @@ CreateTypeStmtByObjectAddress(const ObjectAddress *address)
return (Node *) RecreateCompositeTypeStmt(address->objectId);
}
case TYPTYPE_DOMAIN:
{
return (Node *) RecreateDomainStmt(address->objectId);
}
default:
{
ereport(ERROR, (errmsg("unsupported type to generate create statement for"),
@ -854,7 +859,7 @@ ObjectAddress
AlterTypeSchemaStmtObjectAddress(Node *node, bool missing_ok)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TYPE);
Assert(stmt->objectType == OBJECT_TYPE || stmt->objectType == OBJECT_DOMAIN);
List *names = (List *) stmt->object;

View File

@ -0,0 +1,626 @@
/*-------------------------------------------------------------------------
*
* deparse_domain_stmts.c
* Functions to turn all Statement structures related to domains back
* into sql.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "parser/parse_coerce.h"
#include "parser/parse_collate.h"
#include "parser/parse_expr.h"
#include "parser/parse_node.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/ruleutils.h"
#include "utils/syscache.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/namespace_utils.h"
static void AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName,
TypeName *typeName);
static Node * replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref);
static Node * TransformDefaultExpr(Node *expr, List *domainName, TypeName *typeName);
static Node * TransformConstraintExpr(Node *expr, TypeName *typeName);
static CoerceToDomainValue * GetCoerceDomainValue(TypeName *typeName);
static char * TypeNameAsIdentifier(TypeName *typeName);
static Oid DomainGetBaseTypeOid(List *names, int32 *baseTypeMod);
static void AppendAlterDomainStmtSetDefault(StringInfo buf, AlterDomainStmt *stmt);
static void AppendAlterDomainStmtAddConstraint(StringInfo buf, AlterDomainStmt *stmt);
static void AppendAlterDomainStmtDropConstraint(StringInfo buf, AlterDomainStmt *stmt);
/*
* DeparseCreateDomainStmt returns the sql representation for the CREATE DOMAIN statement.
*/
char *
DeparseCreateDomainStmt(Node *node)
{
CreateDomainStmt *stmt = castNode(CreateDomainStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
const char *domainIdentifier = NameListToQuotedString(stmt->domainname);
const char *typeIdentifier = TypeNameAsIdentifier(stmt->typeName);
appendStringInfo(&buf, "CREATE DOMAIN %s AS %s", domainIdentifier, typeIdentifier);
if (stmt->collClause)
{
const char *collateIdentifier =
NameListToQuotedString(stmt->collClause->collname);
appendStringInfo(&buf, " COLLATE %s", collateIdentifier);
}
Constraint *constraint = NULL;
foreach_ptr(constraint, stmt->constraints)
{
AppendConstraint(&buf, constraint, stmt->domainname, stmt->typeName);
}
appendStringInfoString(&buf, ";");
return buf.data;
}
/*
* TypeNameAsIdentifier returns the sql identifier of a TypeName. This is more complex
* than concatenating the schema name and typename since certain types contain modifiers
* that need to be correctly represented.
*/
static char *
TypeNameAsIdentifier(TypeName *typeName)
{
int32 typmod = 0;
Oid typeOid = InvalidOid;
bits16 formatFlags = FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY;
typenameTypeIdAndMod(NULL, typeName, &typeOid, &typmod);
return format_type_extended(typeOid, typmod, formatFlags);
}
/*
* DeparseDropDomainStmt returns the sql for teh DROP DOMAIN statement.
*/
char *
DeparseDropDomainStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfoString(&buf, "DROP DOMAIN ");
if (stmt->missing_ok)
{
appendStringInfoString(&buf, "IF EXISTS ");
}
TypeName *domainName = NULL;
bool first = true;
foreach_ptr(domainName, stmt->objects)
{
if (!first)
{
appendStringInfoString(&buf, ", ");
}
first = false;
const char *identifier = NameListToQuotedString(domainName->names);
appendStringInfoString(&buf, identifier);
}
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(&buf, " CASCADE");
}
appendStringInfoString(&buf, ";");
return buf.data;
}
/*
* DeparseAlterDomainStmt returns the sql representation of the DOMAIN specific ALTER
* statements.
*/
char *
DeparseAlterDomainStmt(Node *node)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf, "ALTER DOMAIN %s ", NameListToQuotedString(stmt->typeName));
switch (stmt->subtype)
{
case 'T': /* SET DEFAULT */
{
AppendAlterDomainStmtSetDefault(&buf, stmt);
break;
}
case 'N': /* DROP NOT NULL */
{
appendStringInfoString(&buf, "DROP NOT NULL");
break;
}
case 'O': /* SET NOT NULL */
{
appendStringInfoString(&buf, "SET NOT NULL");
break;
}
case 'C': /* ADD [CONSTRAINT name] */
{
AppendAlterDomainStmtAddConstraint(&buf, stmt);
break;
}
case 'X': /* DROP CONSTRAINT */
{
AppendAlterDomainStmtDropConstraint(&buf, stmt);
break;
}
case 'V': /* VALIDATE CONSTRAINT */
{
appendStringInfo(&buf, "VALIDATE CONSTRAINT %s",
quote_identifier(stmt->name));
break;
}
default:
{
elog(ERROR, "unsupported alter domain statement for distribution");
}
}
appendStringInfoChar(&buf, ';');
return buf.data;
}
/*
* DeparseDomainRenameConstraintStmt returns the sql representation of the domain
* constraint renaming.
*/
char *
DeparseDomainRenameConstraintStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
char *domainIdentifier = NameListToQuotedString(castNode(List, stmt->object));
appendStringInfo(&buf, "ALTER DOMAIN %s RENAME CONSTRAINT %s TO %s;",
domainIdentifier,
quote_identifier(stmt->subname),
quote_identifier(stmt->newname));
return buf.data;
}
/*
* DeparseAlterDomainOwnerStmt returns the sql representation of the ALTER DOMAIN OWNER
* statement.
*/
char *
DeparseAlterDomainOwnerStmt(Node *node)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
List *domainName = castNode(List, stmt->object);
char *domainIdentifier = NameListToQuotedString(domainName);
appendStringInfo(&buf, "ALTER DOMAIN %s OWNER TO %s;",
domainIdentifier,
RoleSpecString(stmt->newowner, true));
return buf.data;
}
/*
* DeparseRenameDomainStmt returns the sql representation of the ALTER DOMAIN RENAME
* statement.
*/
char *
DeparseRenameDomainStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
List *domainName = castNode(List, stmt->object);
char *domainIdentifier = NameListToQuotedString(domainName);
appendStringInfo(&buf, "ALTER DOMAIN %s RENAME TO %s;",
domainIdentifier,
quote_identifier(stmt->newname));
return buf.data;
}
/*
* DeparseAlterDomainSchemaStmt returns the sql representation of the ALTER DOMAIN SET
* SCHEMA statement.
*/
char *
DeparseAlterDomainSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
List *domainName = castNode(List, stmt->object);
char *domainIdentifier = NameListToQuotedString(domainName);
appendStringInfo(&buf, "ALTER DOMAIN %s SET SCHEMA %s;",
domainIdentifier,
quote_identifier(stmt->newschema));
return buf.data;
}
/*
* DomainGetBaseTypeOid returns the type Oid and the type modifiers of the type underlying
* a domain addresses by the namelist provided as the names argument. The type modifier is
* only provided if the baseTypeMod pointer is a valid pointer on where to write the
* modifier (not a NULL pointer).
*
* If the type cannot be found this function will raise a non-userfacing error. Care needs
* to be taken by the caller that the domain is actually existing.
*/
static Oid
DomainGetBaseTypeOid(List *names, int32 *baseTypeMod)
{
TypeName *domainName = makeTypeNameFromNameList(names);
Oid domainoid = typenameTypeId(NULL, domainName);
HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainoid));
if (!HeapTupleIsValid(tup))
{
elog(ERROR, "cache lookup failed for type %u", domainoid);
}
Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup);
Oid baseTypeOid = typTup->typbasetype;
if (baseTypeMod)
{
*baseTypeMod = typTup->typtypmod;
}
ReleaseSysCache(tup);
return baseTypeOid;
}
/*
* AppendAlterDomainStmtSetDefault is a helper function that appends the default value
* portion of an ALTER DOMAIN statement that is changing the default value of the domain.
*/
static void
AppendAlterDomainStmtSetDefault(StringInfo buf, AlterDomainStmt *stmt)
{
if (stmt->def == NULL)
{
/* no default expression is a DROP DEFAULT statment */
appendStringInfoString(buf, "DROP DEFAULT");
return;
}
int32 baseTypMod = 0;
Oid baseOid = DomainGetBaseTypeOid(stmt->typeName, &baseTypMod);
TypeName *baseTypeName = makeTypeNameFromOid(baseOid, baseTypMod);
/* cook the default expression, without cooking we can't deparse */
Node *expr = stmt->def;
expr = TransformDefaultExpr(expr, stmt->typeName, baseTypeName);
/* deparse while the searchpath is cleared to force qualification of identifiers */
PushOverrideEmptySearchPath(CurrentMemoryContext);
char *exprSql = deparse_expression(expr, NIL, true, true);
PopOverrideSearchPath();
appendStringInfo(buf, "SET DEFAULT %s", exprSql);
}
/*
* AppendAlterDomainStmtAddConstraint is a helper function that appends the constraint
* specification for an ALTER DOMAIN statement that adds a constraint to the domain.
*/
static void
AppendAlterDomainStmtAddConstraint(StringInfo buf, AlterDomainStmt *stmt)
{
if (stmt->def == NULL || !IsA(stmt->def, Constraint))
{
ereport(ERROR, (errmsg("unable to deparse ALTER DOMAIN statement due to "
"unexpected contents")));
}
Constraint *constraint = castNode(Constraint, stmt->def);
appendStringInfoString(buf, "ADD");
int32 baseTypMod = 0;
Oid baseOid = DomainGetBaseTypeOid(stmt->typeName, &baseTypMod);
TypeName *baseTypeName = makeTypeNameFromOid(baseOid, baseTypMod);
AppendConstraint(buf, constraint, stmt->typeName, baseTypeName);
if (!constraint->initially_valid)
{
appendStringInfoString(buf, " NOT VALID");
}
}
/*
* AppendAlterDomainStmtDropConstraint is a helper function that appends the DROP
* CONSTRAINT part of an ALTER DOMAIN statement for an alter statement that drops a
* constraint.
*/
static void
AppendAlterDomainStmtDropConstraint(StringInfo buf, AlterDomainStmt *stmt)
{
appendStringInfoString(buf, "DROP CONSTRAINT ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
appendStringInfoString(buf, quote_identifier(stmt->name));
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
}
/*
* AppendConstraint is a helper function that appends a constraint specification to a sql
* string that is adding a constraint.
*
* There are multiple places where a constraint specification is appended to sql strings.
*
* Given the complexities of serializing a constraint they all use this routine.
*/
static void
AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName,
TypeName *typeName)
{
if (constraint->conname)
{
appendStringInfo(buf, " CONSTRAINT %s", quote_identifier(constraint->conname));
}
switch (constraint->contype)
{
case CONSTR_CHECK:
{
Node *expr = NULL;
if (constraint->raw_expr)
{
/* the expression was parsed from sql, still needs to transform */
expr = TransformConstraintExpr(constraint->raw_expr, typeName);
}
else if (constraint->cooked_expr)
{
/* expression was read from the catalog, no cooking required just parse */
expr = stringToNode(constraint->cooked_expr);
}
else
{
elog(ERROR, "missing expression for domain constraint");
}
PushOverrideEmptySearchPath(CurrentMemoryContext);
char *exprSql = deparse_expression(expr, NIL, true, true);
PopOverrideSearchPath();
appendStringInfo(buf, " CHECK (%s)", exprSql);
return;
}
case CONSTR_DEFAULT:
{
Node *expr = NULL;
if (constraint->raw_expr)
{
/* the expression was parsed from sql, still needs to transform */
expr = TransformDefaultExpr(constraint->raw_expr, domainName, typeName);
}
else if (constraint->cooked_expr)
{
/* expression was read from the catalog, no cooking required just parse */
expr = stringToNode(constraint->cooked_expr);
}
else
{
elog(ERROR, "missing expression for domain default");
}
PushOverrideEmptySearchPath(CurrentMemoryContext);
char *exprSql = deparse_expression(expr, NIL, true, true);
PopOverrideSearchPath();
appendStringInfo(buf, " DEFAULT %s", exprSql);
return;
}
case CONSTR_NOTNULL:
{
appendStringInfoString(buf, " NOT NULL");
return;
}
case CONSTR_NULL:
{
appendStringInfoString(buf, " NULL");
return;
}
default:
{
ereport(ERROR, (errmsg("unsupported constraint for distributed domain")));
}
}
}
/*
* TransformDefaultExpr transforms a default expression from the expression passed on the
* AST to a cooked version that postgres uses internally.
*
* Only the cooked version can be easily turned back into a sql string, hence its use in
* the deparser. This is only called for default expressions that don't have a cooked
* variant stored.
*/
static Node *
TransformDefaultExpr(Node *expr, List *domainName, TypeName *typeName)
{
const char *domainNameStr = NameListToQuotedString(domainName);
int32 basetypeMod = 0; /* capture typeMod during lookup */
Type tup = typenameType(NULL, typeName, &basetypeMod);
Oid basetypeoid = typeTypeId(tup);
ReleaseSysCache(tup);
ParseState *pstate = make_parsestate(NULL);
Node *defaultExpr = cookDefault(pstate, expr,
basetypeoid,
basetypeMod,
domainNameStr,
0);
return defaultExpr;
}
/*
* TransformConstraintExpr transforms a constraint expression from the expression passed
* on the AST to a cooked version that postgres uses internally.
*
* Only the cooked version can be easily turned back into a sql string, hence its use in
* the deparser. This is only called for default expressions that don't have a cooked
* variant stored.
*/
static Node *
TransformConstraintExpr(Node *expr, TypeName *typeName)
{
/*
* Convert the A_EXPR in raw_expr into an EXPR
*/
ParseState *pstate = make_parsestate(NULL);
/*
* Set up a CoerceToDomainValue to represent the occurrence of VALUE in
* the expression. Note that it will appear to have the type of the base
* type, not the domain. This seems correct since within the check
* expression, we should not assume the input value can be considered a
* member of the domain.
*/
CoerceToDomainValue *domVal = GetCoerceDomainValue(typeName);
pstate->p_pre_columnref_hook = replace_domain_constraint_value;
pstate->p_ref_hook_state = (void *) domVal;
expr = transformExpr(pstate, expr, EXPR_KIND_DOMAIN_CHECK);
/*
* Make sure it yields a boolean result.
*/
expr = coerce_to_boolean(pstate, expr, "CHECK");
/*
* Fix up collation information.
*/
assign_expr_collations(pstate, expr);
return expr;
}
/*
* GetCoerceDomainValue creates a stub CoerceToDomainValue struct representing the type
* referenced by the typeName.
*/
static CoerceToDomainValue *
GetCoerceDomainValue(TypeName *typeName)
{
int32 typMod = 0; /* capture typeMod during lookup */
Type tup = LookupTypeName(NULL, typeName, &typMod, false);
if (tup == NULL)
{
elog(ERROR, "unable to lookup type information for %s",
NameListToQuotedString(typeName->names));
}
CoerceToDomainValue *domVal = makeNode(CoerceToDomainValue);
domVal->typeId = typeTypeId(tup);
domVal->typeMod = typMod;
domVal->collation = typeTypeCollation(tup);
domVal->location = -1;
ReleaseSysCache(tup);
return domVal;
}
/* Parser pre_columnref_hook for domain CHECK constraint parsing */
static Node *
replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref)
{
/*
* Check for a reference to "value", and if that's what it is, replace
* with a CoerceToDomainValue as prepared for us by domainAddConstraint.
* (We handle VALUE as a name, not a keyword, to avoid breaking a lot of
* applications that have used VALUE as a column name in the past.)
*/
if (list_length(cref->fields) == 1)
{
Node *field1 = (Node *) linitial(cref->fields);
Assert(IsA(field1, String));
char *colname = strVal(field1);
if (strcmp(colname, "value") == 0)
{
CoerceToDomainValue *domVal = copyObject(pstate->p_ref_hook_state);
/* Propagate location knowledge, if any */
domVal->location = cref->location;
return (Node *) domVal;
}
}
return NULL;
}

View File

@ -0,0 +1,257 @@
/*-------------------------------------------------------------------------
*
* qualify_domain.c
* Functions to fully qualify, make the statements independent of
* search_path settings, for all domain related statements. This
* mostly consists of adding the schema name to all the domain
* names referencing domains.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "parser/parse_type.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
static void QualifyTypeName(TypeName *typeName, bool missing_ok);
static void QualifyCollate(CollateClause *collClause, bool missing_ok);
/*
* QualifyCreateDomainStmt modifies the CreateDomainStmt passed to become search_path
* independent.
*/
void
QualifyCreateDomainStmt(Node *node)
{
CreateDomainStmt *stmt = castNode(CreateDomainStmt, node);
char *schemaName = NULL;
char *domainName = NULL;
/* fully qualify domain name */
DeconstructQualifiedName(stmt->domainname, &schemaName, &domainName);
if (!schemaName)
{
RangeVar *var = makeRangeVarFromNameList(stmt->domainname);
Oid creationSchema = RangeVarGetCreationNamespace(var);
schemaName = get_namespace_name(creationSchema);
stmt->domainname = list_make2(makeString(schemaName), makeString(domainName));
}
/* referenced types should be fully qualified */
QualifyTypeName(stmt->typeName, false);
QualifyCollate(stmt->collClause, false);
}
/*
* QualifyDropDomainStmt modifies the DropStmt for DOMAIN's to be search_path independent.
*/
void
QualifyDropDomainStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
TypeName *domainName = NULL;
foreach_ptr(domainName, stmt->objects)
{
QualifyTypeName(domainName, stmt->missing_ok);
}
}
/*
* QualifyAlterDomainStmt modifies the AlterDomainStmt to be search_path independent.
*/
void
QualifyAlterDomainStmt(Node *node)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
if (list_length(stmt->typeName) == 1)
{
TypeName *typeName = makeTypeNameFromNameList(stmt->typeName);
QualifyTypeName(typeName, false);
stmt->typeName = typeName->names;
}
}
/*
* QualifyDomainRenameConstraintStmt modifies the RenameStmt for domain constraints to be
* search_path independent.
*/
void
QualifyDomainRenameConstraintStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMCONSTRAINT);
List *domainName = castNode(List, stmt->object);
if (list_length(domainName) == 1)
{
TypeName *typeName = makeTypeNameFromNameList(domainName);
QualifyTypeName(typeName, false);
stmt->object = (Node *) typeName->names;
}
}
/*
* QualifyAlterDomainOwnerStmt modifies the AlterOwnerStmt for DOMAIN's to be search_oath
* independent.
*/
void
QualifyAlterDomainOwnerStmt(Node *node)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
List *domainName = castNode(List, stmt->object);
if (list_length(domainName) == 1)
{
TypeName *typeName = makeTypeNameFromNameList(domainName);
QualifyTypeName(typeName, false);
stmt->object = (Node *) typeName->names;
}
}
/*
* QualifyRenameDomainStmt modifies the RenameStmt for the Domain to be search_path
* independent.
*/
void
QualifyRenameDomainStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMAIN);
List *domainName = castNode(List, stmt->object);
if (list_length(domainName) == 1)
{
TypeName *typeName = makeTypeNameFromNameList(domainName);
QualifyTypeName(typeName, false);
stmt->object = (Node *) typeName->names;
}
}
/*
* QualifyAlterDomainSchemaStmt modifies the AlterObjectSchemaStmt to be search_path
* independent.
*/
void
QualifyAlterDomainSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
List *domainName = castNode(List, stmt->object);
if (list_length(domainName) == 1)
{
TypeName *typeName = makeTypeNameFromNameList(domainName);
QualifyTypeName(typeName, false);
stmt->object = (Node *) typeName->names;
}
}
/*
* QualifyTypeName qualifies a TypeName object in place. When missing_ok is false it might
* throw an error if the type can't be found based on its name. When an oid is provided
* missing_ok is ignored and treated as false. Meaning, even if missing_ok is true the
* function might raise an error for non-existing types if the oid can't be found.
*/
static void
QualifyTypeName(TypeName *typeName, bool missing_ok)
{
if (OidIsValid(typeName->typeOid))
{
/*
* When the typeName is provided as oid, fill in the names.
* missing_ok is ignored for oid's
*/
Type typeTup = typeidType(typeName->typeOid);
char *name = typeTypeName(typeTup);
Oid namespaceOid = TypeOidGetNamespaceOid(typeName->typeOid);
char *schemaName = get_namespace_name(namespaceOid);
typeName->names = list_make2(makeString(schemaName), makeString(name));
ReleaseSysCache(typeTup);
}
else
{
char *name = NULL;
char *schemaName = NULL;
DeconstructQualifiedName(typeName->names, &schemaName, &name);
if (!schemaName)
{
Oid typeOid = LookupTypeNameOid(NULL, typeName, missing_ok);
Oid namespaceOid = TypeOidGetNamespaceOid(typeOid);
schemaName = get_namespace_name(namespaceOid);
typeName->names = list_make2(makeString(schemaName), makeString(name));
}
}
}
/*
* QualifyCollate qualifies any given CollateClause by adding any missing schema name to
* the collation being identified.
*
* If collClause is a NULL pointer this function is a no-nop.
*/
static void
QualifyCollate(CollateClause *collClause, bool missing_ok)
{
if (collClause == NULL)
{
/* no collate clause, nothing to qualify*/
return;
}
if (list_length(collClause->collname) != 1)
{
/* already qualified */
return;
}
Oid collOid = get_collation_oid(collClause->collname, missing_ok);
ObjectAddress collationAddress = { 0 };
ObjectAddressSet(collationAddress, CollationRelationId, collOid);
List *objName = NIL;
List *objArgs = NIL;
#if PG_VERSION_NUM >= PG_VERSION_14
getObjectIdentityParts(&collationAddress, &objName, &objArgs, false);
#else
getObjectIdentityParts(&collationAddress, &objName, &objArgs);
#endif
collClause->collname = NIL;
char *name = NULL;
foreach_ptr(name, objName)
{
collClause->collname = lappend(collClause->collname, makeString(name));
}
}

View File

@ -31,13 +31,10 @@
#include "utils/syscache.h"
#include "utils/lsyscache.h"
static char * GetTypeNamespaceNameByNameList(List *names);
static Oid TypeOidGetNamespaceOid(Oid typeOid);
/*
* GetTypeNamespaceNameByNameList resolved the schema name of a type by its namelist.
*/
static char *
char *
GetTypeNamespaceNameByNameList(List *names)
{
TypeName *typeName = makeTypeNameFromNameList(names);
@ -51,7 +48,7 @@ GetTypeNamespaceNameByNameList(List *names)
/*
* TypeOidGetNamespaceOid resolves the namespace oid for a type identified by its type oid
*/
static Oid
Oid
TypeOidGetNamespaceOid(Oid typeOid)
{
HeapTuple typeTuple = SearchSysCache1(TYPEOID, typeOid);

View File

@ -763,6 +763,11 @@ GetObjectTypeString(ObjectType objType)
return "database";
}
case OBJECT_DOMAIN:
{
return "domain";
}
case OBJECT_EXTENSION:
{
return "extension";

View File

@ -10,6 +10,7 @@
#include "postgres.h"
#include "distributed/commands.h"
#include "distributed/pg_version_constants.h"
#include "access/genam.h"
@ -21,6 +22,7 @@
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc_d.h"
@ -127,6 +129,7 @@ static List * GetRelationTriggerFunctionDependencyList(Oid relationId);
static List * GetRelationStatsSchemaDependencyList(Oid relationId);
static List * GetRelationIndicesDependencyList(Oid relationId);
static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId);
static List * GetTypeConstraintDependencyDefinition(Oid typeId);
static List * CreateObjectAddressDependencyDefList(Oid classId, List *objectIdList);
static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
@ -630,6 +633,15 @@ SupportedDependencyByCitus(const ObjectAddress *address)
return IsObjectAddressOwnedByExtension(address, NULL);
}
case OCLASS_CONSTRAINT:
{
/*
* Constraints are only supported when on domain types. Other constraints have
* their typid set to InvalidOid.
*/
return OidIsValid(get_constraint_typid(address->objectId));
}
case OCLASS_COLLATION:
{
return true;
@ -691,6 +703,7 @@ SupportedDependencyByCitus(const ObjectAddress *address)
{
case TYPTYPE_ENUM:
case TYPTYPE_COMPOSITE:
case TYPTYPE_DOMAIN:
{
return true;
}
@ -1196,17 +1209,36 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
{
case TypeRelationId:
{
/*
* types depending on other types are not captured in pg_depend, instead they
* are described with their dependencies by the relation that describes the
* composite type.
*/
if (get_typtype(target.objectId) == TYPTYPE_COMPOSITE)
switch (get_typtype(target.objectId))
{
Oid typeRelationId = get_typ_typrelid(target.objectId);
DependencyDefinition *dependency =
CreateObjectAddressDependencyDef(RelationRelationId, typeRelationId);
result = lappend(result, dependency);
/*
* types depending on other types are not captured in pg_depend, instead
* they are described with their dependencies by the relation that
* describes the composite type.
*/
case TYPTYPE_COMPOSITE:
{
Oid typeRelationId = get_typ_typrelid(target.objectId);
DependencyDefinition *dependency =
CreateObjectAddressDependencyDef(RelationRelationId,
typeRelationId);
result = lappend(result, dependency);
break;
}
/*
* Domains can have constraints associated with them. Constraints themself
* can depend on things like functions. To support the propagation of
* these functions we will add the constraints to the list of objects to
* be created.
*/
case TYPTYPE_DOMAIN:
{
List *dependencies =
GetTypeConstraintDependencyDefinition(target.objectId);
result = list_concat(result, dependencies);
break;
}
}
/*
@ -1381,6 +1413,49 @@ GetRelationTriggerFunctionDependencyList(Oid relationId)
}
/*
* GetTypeConstraintDependencyDefinition creates a list of constraint dependency
* definitions for a given type
*/
static List *
GetTypeConstraintDependencyDefinition(Oid typeId)
{
/* lookup and look all constraints to add them to the CreateDomainStmt */
Relation conRel = table_open(ConstraintRelationId, AccessShareLock);
/* Look for CHECK Constraints on this domain */
ScanKeyData key[1];
ScanKeyInit(&key[0],
Anum_pg_constraint_contypid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(typeId));
SysScanDesc scan = systable_beginscan(conRel, ConstraintTypidIndexId, true, NULL, 1,
key);
List *dependencies = NIL;
HeapTuple conTup = NULL;
while (HeapTupleIsValid(conTup = systable_getnext(scan)))
{
Form_pg_constraint c = (Form_pg_constraint) GETSTRUCT(conTup);
if (c->contype != CONSTRAINT_CHECK)
{
/* Ignore non-CHECK constraints, shouldn't be any */
continue;
}
dependencies = lappend(dependencies, CreateObjectAddressDependencyDef(
ConstraintRelationId, c->oid));
}
systable_endscan(scan);
table_close(conRel, NoLock);
return dependencies;
}
/*
* CreateObjectAddressDependencyDef returns DependencyDefinition object that
* stores the ObjectAddress for the database object identified by classId and

View File

@ -171,6 +171,35 @@ extern List * PostprocessAlterDatabaseOwnerStmt(Node *node, const char *queryStr
extern ObjectAddress AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok);
extern List * DatabaseOwnerDDLCommands(const ObjectAddress *address);
/* domain.c - forward declarations */
extern List * PreprocessCreateDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCreateDomainStmt(Node *node, const char *queryString);
extern List * PreprocessDropDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDomainStmt(Node *node, const char *queryString);
extern List * PreprocessDomainRenameConstraintStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterDomainOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDomainOwnerStmt(Node *node, const char *queryString);
extern List * PreprocessRenameDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDomainSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDomainSchemaStmt(Node *node, const char *queryString);
extern ObjectAddress CreateDomainStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterDomainStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress DomainRenameConstraintStmtObjectAddress(Node *node,
bool missing_ok);
extern ObjectAddress AlterDomainOwnerStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress RenameDomainStmtObjectAddress(Node *node, bool missing_ok);
extern CreateDomainStmt * RecreateDomainStmt(Oid domainOid);
extern Oid get_constraint_typid(Oid conoid);
/* extension.c - forward declarations */
extern bool IsDropCitusExtensionStmt(Node *parsetree);
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);

View File

@ -50,6 +50,23 @@ extern void QualifyRenameCollationStmt(Node *stmt);
extern void QualifyAlterCollationSchemaStmt(Node *stmt);
extern void QualifyAlterCollationOwnerStmt(Node *stmt);
/* forward declarations for deparse_domain_stmts.c */
extern char * DeparseCreateDomainStmt(Node *node);
extern char * DeparseDropDomainStmt(Node *node);
extern char * DeparseAlterDomainStmt(Node *node);
extern char * DeparseDomainRenameConstraintStmt(Node *node);
extern char * DeparseAlterDomainOwnerStmt(Node *node);
extern char * DeparseRenameDomainStmt(Node *node);
extern char * DeparseAlterDomainSchemaStmt(Node *node);
extern void QualifyCreateDomainStmt(Node *node);
extern void QualifyDropDomainStmt(Node *node);
extern void QualifyAlterDomainStmt(Node *node);
extern void QualifyDomainRenameConstraintStmt(Node *node);
extern void QualifyAlterDomainOwnerStmt(Node *node);
extern void QualifyRenameDomainStmt(Node *node);
extern void QualifyAlterDomainSchemaStmt(Node *node);
/* forward declarations for deparse_foreign_server_stmts.c */
extern char * DeparseCreateForeignServerStmt(Node *node);
extern char * DeparseAlterForeignServerStmt(Node *node);
@ -121,6 +138,9 @@ extern void QualifyCreateEnumStmt(Node *stmt);
extern void QualifyAlterTypeSchemaStmt(Node *stmt);
extern void QualifyAlterTypeOwnerStmt(Node *stmt);
extern char * GetTypeNamespaceNameByNameList(List *names);
extern Oid TypeOidGetNamespaceOid(Oid typeOid);
extern ObjectAddress GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok);
extern ObjectAddress RenameAttributeStmtObjectAddress(Node *stmt, bool missing_ok);

View File

@ -133,18 +133,8 @@ CREATE TYPE nested_composite_type AS (
a composite_type,
b composite_type
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
(1 row)
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
(1 row)
CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type;
CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type;
INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type);
SELECT col FROM composite_type_table;
col
@ -220,18 +210,8 @@ CREATE TYPE binaryless_composite_type AS (
a aclitem,
b aclitem
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
(1 row)
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
(1 row)
CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem;
CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type;
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT col1 FROM binaryless_builtin;
col1

View File

@ -589,30 +589,13 @@ SELECT count(*) FROM coordinator_evaluation_table_2 WHERE key = 101;
CREATE TYPE comptype_int as (int_a int);
CREATE DOMAIN domain_comptype_int AS comptype_int CHECK ((VALUE).int_a > 0);
-- citus does not propagate domain types
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below.
SELECT run_command_on_workers(
$$
CREATE DOMAIN coordinator_evaluation.domain_comptype_int AS coordinator_evaluation.comptype_int CHECK ((VALUE).int_a > 0)
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE DOMAIN")
(localhost,57638,t,"CREATE DOMAIN")
(2 rows)
CREATE TABLE reference_table(column_a coordinator_evaluation.domain_comptype_int);
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
RESET citus.enable_metadata_sync;
INSERT INTO reference_table (column_a) VALUES ('(1)');
INSERT INTO reference_table (column_a) VALUES ('(2)'), ('(3)');
INSERT INTO reference_table VALUES ('(4)'), ('(5)');

View File

@ -0,0 +1,947 @@
CREATE SCHEMA distributed_domain;
SET search_path TO distributed_domain;
SET citus.next_shard_id TO 93631000;
-- verify domain is not already present on workers
SELECT * FROM run_command_on_workers($$
SELECT 'distributed_domain.age'::regtype;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: type "distributed_domain.age" does not exist
localhost | 57638 | f | ERROR: type "distributed_domain.age" does not exist
(2 rows)
CREATE DOMAIN age AS int CHECK( VALUE >= 0 );
-- check domain exists on workers to proof the domain got propagated
SELECT * FROM run_command_on_workers($$
SELECT 'distributed_domain.age'::regtype;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | distributed_domain.age
localhost | 57638 | t | distributed_domain.age
(2 rows)
-- verify the constraint triggers when operations that conflict are pushed to the shards
CREATE TABLE foo (a int);
SELECT create_distributed_table('foo', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE bar (a age);
SELECT create_distributed_table('bar', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO foo (a) VALUES (-1); -- foo can have negative values
INSERT INTO bar (a) SELECT a FROM foo; -- bar cannot
ERROR: value for domain distributed_domain.age violates check constraint "age_check"
CONTEXT: while executing command on localhost:xxxxx
DROP TABLE bar; -- need to drop this one directly, there is currently a bug in drop schema cascade when it drops both a table and the type of the distribution column at the same time
-- create a domain that is not propagated
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN us_postal_code AS TEXT
CHECK(
VALUE ~ '^\d{5}$'
OR VALUE ~ '^\d{5}-\d{4}$'
);
RESET citus.enable_ddl_propagation;
-- and use in distributed table to trigger domain propagation
CREATE TABLE us_snail_addy (
address_id SERIAL PRIMARY KEY,
street1 TEXT NOT NULL,
street2 TEXT,
street3 TEXT,
city TEXT NOT NULL,
postal us_postal_code NOT NULL
);
SELECT create_distributed_table('us_snail_addy', 'address_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- defaults are marked as a constraint on the statement, which makes it interesting for deparsing, so some extensive
-- test coverage to make sure it works in all strange cases.
CREATE SCHEMA distributed_domain_constraints;
SET search_path TO distributed_domain_constraints;
CREATE DOMAIN with_default AS int DEFAULT 0;
CREATE DOMAIN age_with_default AS int DEFAULT 0 CHECK (value > 0);
CREATE DOMAIN age_with_default2 AS int CHECK (value > 0) DEFAULT 0;
CREATE DOMAIN age_with_default3 AS int CHECK (value > 0) DEFAULT NULL;
CREATE DOMAIN age_with_default4 AS int NOT NULL CHECK (value > 0) DEFAULT NULL;
-- test casting with worker queries
-- should simply work, has no check constraints
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.with_default;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.with_default;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 0
localhost | 57638 | t | 0
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.with_default;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
-- has a constraint where the number needs to be greater than 0
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain_constraints.age_with_default violates check constraint "age_with_default_check"
localhost | 57638 | f | ERROR: value for domain distributed_domain_constraints.age_with_default violates check constraint "age_with_default_check"
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
-- has a constraint where the number needs to be greater than 0
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default2;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default2;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain_constraints.age_with_default2 violates check constraint "age_with_default2_check"
localhost | 57638 | f | ERROR: value for domain distributed_domain_constraints.age_with_default2 violates check constraint "age_with_default2_check"
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default2;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
-- has a constraint where the number needs to be greater than 0
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default3;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default3;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain_constraints.age_with_default3 violates check constraint "age_with_default3_check"
localhost | 57638 | f | ERROR: value for domain distributed_domain_constraints.age_with_default3 violates check constraint "age_with_default3_check"
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default3;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
-- has a constraint where the number needs to be greater than 0 and not null
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default4;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: domain distributed_domain_constraints.age_with_default4 does not allow null values
localhost | 57638 | f | ERROR: domain distributed_domain_constraints.age_with_default4 does not allow null values
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default4;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain_constraints.age_with_default4 violates check constraint "age_with_default4_check"
localhost | 57638 | f | ERROR: value for domain distributed_domain_constraints.age_with_default4 violates check constraint "age_with_default4_check"
(2 rows)
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default4;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
-- test usage in distributed tables
-- we use all domains defined earlier and insert all possible values, NULL, default, non-violation, violation. Some of
-- the default values will violate a constraint.
-- Based on the constraints on the domain we see different errors and will end up with a final set of data stored.
CREATE TABLE use_default (a int, b with_default);
SELECT create_distributed_table('use_default', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_default (a, b) VALUES (0, NULL);
INSERT INTO use_default (a) VALUES (1);
INSERT INTO use_default (a, b) VALUES (2, 1);
INSERT INTO use_default (a, b) VALUES (3, -1);
SELECT * FROM use_default ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
1 | 0
2 | 1
3 | -1
(4 rows)
CREATE TABLE use_age_default (a int, b age_with_default);
SELECT create_distributed_table('use_age_default', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default (a, b) VALUES (0, NULL);
INSERT INTO use_age_default (a) VALUES (1);
ERROR: value for domain distributed_domain_constraints.age_with_default violates check constraint "age_with_default_check"
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default (a, b) VALUES (2, 1);
INSERT INTO use_age_default (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default violates check constraint "age_with_default_check"
CONTEXT: while executing command on localhost:xxxxx
-- also load some data with copy to verify coercions.
\COPY use_age_default FROM STDIN DELIMITER AS ',';
ERROR: value for domain age_with_default violates check constraint "age_with_default_check"
CONTEXT: COPY use_age_default, line 1, column b: " -1"
\COPY use_age_default FROM STDIN DELIMITER AS ',';
SELECT * FROM use_age_default ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
2 | 1
5 | 1
(3 rows)
CREATE TABLE use_age_default2 (a int, b age_with_default2);
SELECT create_distributed_table('use_age_default2', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default2 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default2 (a) VALUES (1);
ERROR: value for domain distributed_domain_constraints.age_with_default2 violates check constraint "age_with_default2_check"
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default2 (a, b) VALUES (2, 1);
INSERT INTO use_age_default2 (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default2 violates check constraint "age_with_default2_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default2 ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
2 | 1
(2 rows)
CREATE TABLE use_age_default3 (a int, b age_with_default3);
SELECT create_distributed_table('use_age_default3', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default3 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default3 (a) VALUES (1);
INSERT INTO use_age_default3 (a, b) VALUES (2, 1);
INSERT INTO use_age_default3 (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default3 violates check constraint "age_with_default3_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default3 ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
1 |
2 | 1
(3 rows)
CREATE TABLE use_age_default4 (a int, b age_with_default4);
SELECT create_distributed_table('use_age_default4', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default4 (a, b) VALUES (0, NULL);
ERROR: domain distributed_domain_constraints.age_with_default4 does not allow null values
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default4 (a) VALUES (1);
ERROR: domain distributed_domain_constraints.age_with_default4 does not allow null values
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default4 (a, b) VALUES (2, 1);
INSERT INTO use_age_default4 (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default4 violates check constraint "age_with_default4_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default4 ORDER BY a;
a | b
---------------------------------------------------------------------
2 | 1
(1 row)
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain_constraints CASCADE;
RESET client_min_messages;
-- same tests as above, with with just in time propagation of domains
CREATE SCHEMA distributed_domain_constraints;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN with_default AS int DEFAULT 0;
CREATE DOMAIN age_with_default AS int DEFAULT 0 CHECK (value > 0);
CREATE DOMAIN age_with_default2 AS int CHECK (value > 0) DEFAULT 0;
CREATE DOMAIN age_with_default3 AS int CHECK (value > 0) DEFAULT NULL;
CREATE DOMAIN age_with_default4 AS int NOT NULL CHECK (value > 0) DEFAULT NULL;
RESET citus.enable_ddl_propagation;
-- use all domains in tables to get them propagated
CREATE TABLE use_default (a int, b with_default);
SELECT create_distributed_table('use_default', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_default (a, b) VALUES (0, NULL);
INSERT INTO use_default (a) VALUES (1);
INSERT INTO use_default (a, b) VALUES (2, 1);
INSERT INTO use_default (a, b) VALUES (3, -1);
SELECT * FROM use_default ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
1 | 0
2 | 1
3 | -1
(4 rows)
CREATE TABLE use_age_default (a int, b age_with_default);
SELECT create_distributed_table('use_age_default', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default (a, b) VALUES (0, NULL);
INSERT INTO use_age_default (a) VALUES (1);
ERROR: value for domain distributed_domain_constraints.age_with_default violates check constraint "age_with_default_check"
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default (a, b) VALUES (2, 1);
INSERT INTO use_age_default (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default violates check constraint "age_with_default_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
2 | 1
(2 rows)
CREATE TABLE use_age_default2 (a int, b age_with_default2);
SELECT create_distributed_table('use_age_default2', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default2 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default2 (a) VALUES (1);
ERROR: value for domain distributed_domain_constraints.age_with_default2 violates check constraint "age_with_default2_check"
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default2 (a, b) VALUES (2, 1);
INSERT INTO use_age_default2 (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default2 violates check constraint "age_with_default2_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default2 ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
2 | 1
(2 rows)
CREATE TABLE use_age_default3 (a int, b age_with_default3);
SELECT create_distributed_table('use_age_default3', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default3 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default3 (a) VALUES (1);
INSERT INTO use_age_default3 (a, b) VALUES (2, 1);
INSERT INTO use_age_default3 (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default3 violates check constraint "age_with_default3_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default3 ORDER BY a;
a | b
---------------------------------------------------------------------
0 |
1 |
2 | 1
(3 rows)
CREATE TABLE use_age_default4 (a int, b age_with_default4);
SELECT create_distributed_table('use_age_default4', 'a', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_default4 (a, b) VALUES (0, NULL);
ERROR: domain distributed_domain_constraints.age_with_default4 does not allow null values
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default4 (a) VALUES (1);
ERROR: domain distributed_domain_constraints.age_with_default4 does not allow null values
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_age_default4 (a, b) VALUES (2, 1);
INSERT INTO use_age_default4 (a, b) VALUES (3, -1);
ERROR: value for domain distributed_domain_constraints.age_with_default4 violates check constraint "age_with_default4_check"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_default4 ORDER BY a;
a | b
---------------------------------------------------------------------
2 | 1
(1 row)
-- clean up
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain_constraints CASCADE;
RESET client_min_messages;
CREATE SCHEMA postgres_domain_examples;
SET search_path TO postgres_domain_examples;
-- make sure the function gets automatically propagated when we propagate the domain
SET citus.enable_ddl_propagation TO off;
create function sql_is_distinct_from(anyelement, anyelement)
returns boolean language sql
as 'select $1 is distinct from $2 limit 1';
RESET citus.enable_ddl_propagation;
CREATE DOMAIN inotnull int
CHECK (sql_is_distinct_from(value, null));
SELECT * FROM run_command_on_workers($$ SELECT 1::postgres_domain_examples.inotnull; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT null::postgres_domain_examples.inotnull; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain postgres_domain_examples.inotnull violates check constraint "inotnull_check"
localhost | 57638 | f | ERROR: value for domain postgres_domain_examples.inotnull violates check constraint "inotnull_check"
(2 rows)
-- create a domain with sql function as a default value
SET citus.enable_ddl_propagation TO off;
create function random_between(min int, max int)
returns int language sql
as 'SELECT round(random()*($2-$1))+$1;';
RESET citus.enable_ddl_propagation;
-- this verifies the function in the default expression is found and distributed, otherwise the creation of the domain would fail on the workers.
CREATE DOMAIN with_random_default int DEFAULT random_between(100, 200);
SET client_min_messages TO warning;
DROP SCHEMA postgres_domain_examples CASCADE;
RESET client_min_messages;
SET search_path TO distributed_domain;
-- verify drops are propagated
CREATE DOMAIN will_drop AS text DEFAULT 'foo';
DROP DOMAIN will_drop;
-- verify domain is dropped from workers
SELECT * FROM run_command_on_workers($$ SELECT 'dropped?'::distributed_domain.will_drop; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: type "distributed_domain.will_drop" does not exist
localhost | 57638 | f | ERROR: type "distributed_domain.will_drop" does not exist
(2 rows)
-- verify the type modifiers are deparsed correctly, both for direct propagation as well as on demand propagation
CREATE DOMAIN varcharmod AS varchar(3);
SELECT * FROM run_command_on_workers($$ SELECT '12345'::distributed_domain.varcharmod; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 123
localhost | 57638 | t | 123
(2 rows)
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN varcharmod_ondemand AS varchar(3);
RESET citus.enable_ddl_propagation;
CREATE TABLE use_varcharmod_ondemand (a int, b varcharmod_ondemand);
SELECT create_distributed_table('use_varcharmod_ondemand', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- epxected error due to value being too long for varchar(3)
INSERT INTO use_varcharmod_ondemand VALUES (1,'12345');
ERROR: value too long for type character varying(3)
SELECT * FROM run_command_on_workers($$ SELECT '12345'::distributed_domain.varcharmod_ondemand; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 123
localhost | 57638 | t | 123
(2 rows)
-- section testing default altering
CREATE DOMAIN alter_default AS text DEFAULT 'foo';
CREATE TABLE use_alter_default (a int, b alter_default);
SELECT create_distributed_table('use_alter_default', 'a', shard_count => 4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_alter_default (a) VALUES (1);
ALTER DOMAIN alter_default SET DEFAULT 'bar';
INSERT INTO use_alter_default (a) VALUES (2);
ALTER DOMAIN alter_default DROP DEFAULT;
INSERT INTO use_alter_default (a) VALUES (3);
SELECT * FROM use_alter_default ORDER BY 1,2;
a | b
---------------------------------------------------------------------
1 | foo
2 | bar
3 |
(3 rows)
-- add new dependency while adding default
CREATE DOMAIN add_default_with_function AS int;
SET citus.enable_ddl_propagation TO off;
create function random_between(min int, max int)
returns int language sql
as 'SELECT round(random()*($2-$1))+$1;';
RESET citus.enable_ddl_propagation;
ALTER DOMAIN add_default_with_function SET DEFAULT random_between(100, 200);
CREATE TABLE use_add_default_with_function (a int, b add_default_with_function);
SELECT create_distributed_table('use_add_default_with_function', 'a', shard_count => 4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_add_default_with_function (a) VALUES (1);
-- altering NULL/NOT NULL
CREATE DOMAIN alter_null AS int;
CREATE TABLE use_alter_null (a int, b alter_null);
SELECT create_distributed_table('use_alter_null', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_alter_null (a) VALUES (1);
ALTER DOMAIN alter_null SET NOT NULL;
ERROR: column "b" of table "use_alter_null_93631040" contains null values
CONTEXT: while executing command on localhost:xxxxx
TRUNCATE use_alter_null;
ALTER DOMAIN alter_null SET NOT NULL;
INSERT INTO use_alter_null (a) VALUES (2);
ERROR: domain distributed_domain.alter_null does not allow null values
CONTEXT: while executing command on localhost:xxxxx
ALTER DOMAIN alter_null DROP NOT NULL;
INSERT INTO use_alter_null (a) VALUES (3);
SELECT * FROM use_alter_null ORDER BY 1;
a | b
---------------------------------------------------------------------
3 |
(1 row)
-- testing adding/dropping constraints
SET citus.enable_ddl_propagation TO off;
create function sql_is_distinct_from(anyelement, anyelement)
returns boolean language sql
as 'select $1 is distinct from $2 limit 1';
RESET citus.enable_ddl_propagation;
CREATE DOMAIN alter_add_constraint int;
ALTER DOMAIN alter_add_constraint ADD CONSTRAINT check_distinct CHECK (sql_is_distinct_from(value, null));
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.alter_add_constraint; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT null::distributed_domain.alter_add_constraint; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain.alter_add_constraint violates check constraint "check_distinct"
localhost | 57638 | f | ERROR: value for domain distributed_domain.alter_add_constraint violates check constraint "check_distinct"
(2 rows)
ALTER DOMAIN alter_add_constraint DROP CONSTRAINT check_distinct;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.alter_add_constraint; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT null::distributed_domain.alter_add_constraint; $$);
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
ALTER DOMAIN alter_add_constraint DROP CONSTRAINT IF EXISTS check_distinct;
NOTICE: constraint "check_distinct" of domain "distributed_domain.alter_add_constraint" does not exist, skipping
ALTER DOMAIN alter_add_constraint ADD CONSTRAINT check_distinct CHECK (sql_is_distinct_from(value, null));
ALTER DOMAIN alter_add_constraint RENAME CONSTRAINT check_distinct TO check_distinct_renamed;
ALTER DOMAIN alter_add_constraint DROP CONSTRAINT check_distinct_renamed;
-- test validating invalid constraints
CREATE DOMAIN age_invalid AS int NOT NULL DEFAULT 0;
CREATE TABLE use_age_invalid (a int, b age_invalid);
SELECT create_distributed_table('use_age_invalid', 'a', shard_count => 4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_age_invalid VALUES (1,1), (2, 2), (3, 0), (4, -1);
ALTER DOMAIN age_invalid ADD CONSTRAINT check_age_positive CHECK (value>=0) NOT VALID;
-- should fail, even though constraint is not valid
INSERT INTO use_age_invalid VALUES (5,-1);
ERROR: value for domain distributed_domain.age_invalid violates check constraint "check_age_positive"
CONTEXT: while executing command on localhost:xxxxx
-- reading violating data of an non-valid constraint errors in citus
SELECT * FROM use_age_invalid ORDER BY 1;
ERROR: value for domain age_invalid violates check constraint "check_age_positive"
-- should fail since there is data in the table that violates the check
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
ERROR: column "b" of table "use_age_invalid_93631045" contains values that violate the new constraint
CONTEXT: while executing command on localhost:xxxxx
DELETE FROM use_age_invalid WHERE b < 0;
-- should succeed now since the violating data has been removed
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
-- still fails for constraint
INSERT INTO use_age_invalid VALUES (5,-1);
ERROR: value for domain distributed_domain.age_invalid violates check constraint "check_age_positive"
CONTEXT: while executing command on localhost:xxxxx
SELECT * FROM use_age_invalid ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 0
(3 rows)
-- verify we can validate a constraint that is already validated, can happen when we add a node while a domain constraint was not validated
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
-- test changing the owner of a domain
SET client_min_messages TO error;
SELECT 1 FROM run_command_on_workers($$ CREATE ROLE domain_owner; $$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
CREATE ROLE domain_owner;
RESET client_min_messages;
CREATE DOMAIN alter_domain_owner AS int;
ALTER DOMAIN alter_domain_owner OWNER TO domain_owner;
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
rolname
---------------------------------------------------------------------
domain_owner
(1 row)
SELECT * FROM run_command_on_workers($$
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | domain_owner
localhost | 57638 | t | domain_owner
(2 rows)
DROP DOMAIN alter_domain_owner;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN alter_domain_owner AS int;
ALTER DOMAIN alter_domain_owner OWNER TO domain_owner;
RESET citus.enable_ddl_propagation;
CREATE TABLE use_alter_domain_owner (a int, b alter_domain_owner);
SELECT create_distributed_table('use_alter_domain_owner', 'a', shard_count => 4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
rolname
---------------------------------------------------------------------
domain_owner
(1 row)
SELECT * FROM run_command_on_workers($$
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | domain_owner
localhost | 57638 | t | domain_owner
(2 rows)
-- rename the domain
ALTER DOMAIN alter_domain_owner RENAME TO renamed_domain;
SELECT * FROM run_command_on_workers($$ SELECT NULL::distributed_domain.renamed_domain; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
-- move schema
SET citus.enable_ddl_propagation TO off;
CREATE SCHEMA distributed_domain_moved;
RESET citus.enable_ddl_propagation;
ALTER DOMAIN renamed_domain SET SCHEMA distributed_domain_moved;
-- test collation
CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
CREATE DOMAIN with_collation AS text COLLATE german_phonebook NOT NULL;
SELECT run_command_on_workers($$ SELECT typcollation::regcollation FROM pg_type WHERE oid = 'distributed_domain.with_collation'::regtype; $$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,distributed_domain.german_phonebook)
(localhost,57638,t,distributed_domain.german_phonebook)
(2 rows)
DROP DOMAIN with_collation;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN with_collation AS text COLLATE german_phonebook NOT NULL;
RESET citus.enable_ddl_propagation;
CREATE TABLE use_with_collation (a int, b with_collation);
SELECT create_reference_table('use_with_collation');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT run_command_on_workers($$ SELECT typcollation::regcollation FROM pg_type WHERE oid = 'distributed_domain.with_collation'::regtype; $$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,distributed_domain.german_phonebook)
(localhost,57638,t,distributed_domain.german_phonebook)
(2 rows)
INSERT INTO use_with_collation VALUES (1, U&'\00E4sop'), (2, 'Vossr');
SELECT * FROM use_with_collation WHERE b < 'b';
a | b
---------------------------------------------------------------------
1 | äsop
(1 row)
-- test domain backed by array
CREATE DOMAIN domain_array AS int[] NOT NULL CHECK (array_length(value,1) >= 2);
SELECT * FROM run_command_on_workers($$ SELECT NULL::distributed_domain.domain_array; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: domain distributed_domain.domain_array does not allow null values
localhost | 57638 | f | ERROR: domain distributed_domain.domain_array does not allow null values
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT ARRAY[1]::distributed_domain.domain_array; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain.domain_array violates check constraint "domain_array_check"
localhost | 57638 | f | ERROR: value for domain distributed_domain.domain_array violates check constraint "domain_array_check"
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT ARRAY[1,2]::distributed_domain.domain_array; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | {1,2}
localhost | 57638 | t | {1,2}
(2 rows)
DROP DOMAIN domain_array;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN domain_array AS int[] NOT NULL CHECK (array_length(value,1) >= 2);
RESET citus.enable_ddl_propagation;
CREATE TABLE use_domain_array (a int, b domain_array);
SELECT create_distributed_table('use_domain_array', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO use_domain_array VALUES (1, NULL);
ERROR: domain distributed_domain.domain_array does not allow null values
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_domain_array VALUES (2, ARRAY[1]);
ERROR: value for domain distributed_domain.domain_array violates check constraint "domain_array_check"
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO use_domain_array VALUES (3, ARRAY[1,2]);
SELECT * FROM use_domain_array ORDER BY 1;
a | b
---------------------------------------------------------------------
3 | {1,2}
(1 row)
-- add nameless constraint
CREATE DOMAIN nameless_constraint AS int;
ALTER DOMAIN nameless_constraint ADD CHECK (value > 0);
SELECT * FROM run_command_on_workers($$ SELECT NULL::distributed_domain.nameless_constraint; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t |
localhost | 57638 | t |
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.nameless_constraint; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT (-1)::distributed_domain.nameless_constraint; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: value for domain distributed_domain.nameless_constraint violates check constraint "nameless_constraint_check"
localhost | 57638 | f | ERROR: value for domain distributed_domain.nameless_constraint violates check constraint "nameless_constraint_check"
(2 rows)
-- Test domains over domains
create domain vchar4 varchar(4);
create domain dinter vchar4 check (substring(VALUE, 1, 1) = 'x');
create domain dtop dinter check (substring(VALUE, 2, 1) = '1');
create table dtest(f1 dtop, id bigserial);
SELECT create_distributed_table('dtest', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
insert into dtest values('x123');
insert into dtest values('123');
ERROR: value for domain dtop violates check constraint "dinter_check"
insert into dtest values('x234');
ERROR: value for domain dtop violates check constraint "dtop_check"
DROP TABLE dtest;
DROP DOMAIN IF EXISTS dtop;
DROP DOMAIN vchar4;
ERROR: cannot drop type vchar4 because other objects depend on it
DETAIL: type dinter depends on type vchar4
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP DOMAIN vchar4 CASCADE;
NOTICE: drop cascades to type dinter
-- drop multiple domains at once, for which one is not distributed
CREATE DOMAIN domain1 AS int;
CREATE DOMAIN domain2 AS text;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN domain3 AS text;
RESET citus.enable_ddl_propagation;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.domain1; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain2; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain3; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: type "distributed_domain.domain3" does not exist
localhost | 57638 | f | ERROR: type "distributed_domain.domain3" does not exist
(2 rows)
DROP DOMAIN domain1, domain2, domain3;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.domain1; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: type "distributed_domain.domain1" does not exist
localhost | 57638 | f | ERROR: type "distributed_domain.domain1" does not exist
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain2; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: type "distributed_domain.domain2" does not exist
localhost | 57638 | f | ERROR: type "distributed_domain.domain2" does not exist
(2 rows)
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain3; $$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: type "distributed_domain.domain3" does not exist
localhost | 57638 | f | ERROR: type "distributed_domain.domain3" does not exist
(2 rows)
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE;

View File

@ -414,30 +414,13 @@ ERROR: inserting or modifying composite type fields is not supported
HINT: Use the column name to insert or update the composite type as a single value
CREATE TYPE two_ints as (if1 int, if2 int);
CREATE DOMAIN domain AS two_ints CHECK ((VALUE).if1 > 0);
-- citus does not propagate domain objects
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below.
SELECT run_command_on_workers(
$$
CREATE DOMAIN type_tests.domain AS type_tests.two_ints CHECK ((VALUE).if1 > 0);
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE DOMAIN")
(localhost,57638,t,"CREATE DOMAIN")
(2 rows)
CREATE TABLE domain_indirection_test (f1 int, f3 domain, domain_array domain[]);
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
SELECT create_distributed_table('domain_indirection_test', 'f1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
RESET citus.enable_metadata_sync;
-- not supported (field indirection to underlying composite type)
INSERT INTO domain_indirection_test (f1,f3.if1, f3.if2) VALUES (0, 1, 2);
ERROR: inserting or modifying composite type fields is not supported

View File

@ -1049,31 +1049,16 @@ SELECT key, value FROM text_partition_column_table ORDER BY key;
DROP TABLE text_partition_column_table;
-- Domain type columns can give issues
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below.
SELECT run_command_on_workers($$
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$')
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE DOMAIN")
(localhost,57638,t,"CREATE DOMAIN")
(2 rows)
CREATE TABLE domain_partition_column_table (
key test_key NOT NULL,
value int
);
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
SELECT create_distributed_table('domain_partition_column_table', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
RESET citus.enable_metadata_sync;
PREPARE prepared_coercion_to_domain_insert(text) AS
INSERT INTO domain_partition_column_table VALUES ($1, 1);
EXECUTE prepared_coercion_to_domain_insert('test-1');

View File

@ -20,16 +20,15 @@ SELECT key, value FROM text_partition_column_table ORDER BY key;
test | 1
(7 rows)
-- TODO: Uncomment tests below once domains are supported
-- PREPARE prepared_coercion_to_domain_insert(text) AS
-- INSERT INTO domain_partition_column_table VALUES ($1, 1);
-- EXECUTE prepared_coercion_to_domain_insert('test-1');
-- EXECUTE prepared_coercion_to_domain_insert('test-2');
-- EXECUTE prepared_coercion_to_domain_insert('test-3');
-- EXECUTE prepared_coercion_to_domain_insert('test-4');
-- EXECUTE prepared_coercion_to_domain_insert('test-5');
-- EXECUTE prepared_coercion_to_domain_insert('test-6');
-- EXECUTE prepared_coercion_to_domain_insert('test-7');
PREPARE prepared_coercion_to_domain_insert(text) AS
INSERT INTO domain_partition_column_table VALUES ($1, 1);
EXECUTE prepared_coercion_to_domain_insert('test-1');
EXECUTE prepared_coercion_to_domain_insert('test-2');
EXECUTE prepared_coercion_to_domain_insert('test-3');
EXECUTE prepared_coercion_to_domain_insert('test-4');
EXECUTE prepared_coercion_to_domain_insert('test-5');
EXECUTE prepared_coercion_to_domain_insert('test-6');
EXECUTE prepared_coercion_to_domain_insert('test-7');
PREPARE FOO AS INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec

View File

@ -65,19 +65,6 @@ SELECT create_distributed_table('text_partition_column_table', 'key');
-- Domain type columns can give issues
-- and we use offset to prevent output diverging
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below. Also uncomment related tests on
-- prepared_statements_4 test file.
SELECT run_command_on_workers($$
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$')
$$) OFFSET 10000;
run_command_on_workers
---------------------------------------------------------------------
(0 rows)
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
CREATE TABLE domain_partition_column_table (
key test_key NOT NULL,
value int
@ -88,7 +75,6 @@ SELECT create_distributed_table('domain_partition_column_table', 'key');
(1 row)
RESET citus.enable_metadata_sync;
-- verify we re-evaluate volatile functions every time
CREATE TABLE http_request (
site_id INT,

View File

@ -317,7 +317,7 @@ test: ssl_by_default
# TODO: After deprecating parameterless create_distributed_function combine
# distributed_functions and function_propagation tests
# ---------
test: distributed_types distributed_types_conflict disable_object_propagation distributed_types_xact_add_enum_value text_search
test: distributed_types distributed_types_conflict disable_object_propagation distributed_types_xact_add_enum_value text_search distributed_domain
test: check_mx
test: distributed_functions distributed_functions_conflict
test: distributed_collations

View File

@ -46,8 +46,8 @@ CREATE TYPE nested_composite_type AS (
a composite_type,
b composite_type
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$);
CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type;
CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type;
INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type);
@ -74,8 +74,8 @@ CREATE TYPE binaryless_composite_type AS (
b aclitem
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$);
CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem;
CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type;
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT col1 FROM binaryless_builtin;

View File

@ -214,21 +214,8 @@ SELECT count(*) FROM coordinator_evaluation_table_2 WHERE key = 101;
CREATE TYPE comptype_int as (int_a int);
CREATE DOMAIN domain_comptype_int AS comptype_int CHECK ((VALUE).int_a > 0);
-- citus does not propagate domain types
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below.
SELECT run_command_on_workers(
$$
CREATE DOMAIN coordinator_evaluation.domain_comptype_int AS coordinator_evaluation.comptype_int CHECK ((VALUE).int_a > 0)
$$);
CREATE TABLE reference_table(column_a coordinator_evaluation.domain_comptype_int);
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
SELECT create_reference_table('reference_table');
RESET citus.enable_metadata_sync;
INSERT INTO reference_table (column_a) VALUES ('(1)');
INSERT INTO reference_table (column_a) VALUES ('(2)'), ('(3)');

View File

@ -0,0 +1,487 @@
CREATE SCHEMA distributed_domain;
SET search_path TO distributed_domain;
SET citus.next_shard_id TO 93631000;
-- verify domain is not already present on workers
SELECT * FROM run_command_on_workers($$
SELECT 'distributed_domain.age'::regtype;
$$) ORDER BY 1,2;
CREATE DOMAIN age AS int CHECK( VALUE >= 0 );
-- check domain exists on workers to proof the domain got propagated
SELECT * FROM run_command_on_workers($$
SELECT 'distributed_domain.age'::regtype;
$$) ORDER BY 1,2;
-- verify the constraint triggers when operations that conflict are pushed to the shards
CREATE TABLE foo (a int);
SELECT create_distributed_table('foo', 'a', shard_count => 2);
CREATE TABLE bar (a age);
SELECT create_distributed_table('bar', 'a', shard_count => 2);
INSERT INTO foo (a) VALUES (-1); -- foo can have negative values
INSERT INTO bar (a) SELECT a FROM foo; -- bar cannot
DROP TABLE bar; -- need to drop this one directly, there is currently a bug in drop schema cascade when it drops both a table and the type of the distribution column at the same time
-- create a domain that is not propagated
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN us_postal_code AS TEXT
CHECK(
VALUE ~ '^\d{5}$'
OR VALUE ~ '^\d{5}-\d{4}$'
);
RESET citus.enable_ddl_propagation;
-- and use in distributed table to trigger domain propagation
CREATE TABLE us_snail_addy (
address_id SERIAL PRIMARY KEY,
street1 TEXT NOT NULL,
street2 TEXT,
street3 TEXT,
city TEXT NOT NULL,
postal us_postal_code NOT NULL
);
SELECT create_distributed_table('us_snail_addy', 'address_id');
-- defaults are marked as a constraint on the statement, which makes it interesting for deparsing, so some extensive
-- test coverage to make sure it works in all strange cases.
CREATE SCHEMA distributed_domain_constraints;
SET search_path TO distributed_domain_constraints;
CREATE DOMAIN with_default AS int DEFAULT 0;
CREATE DOMAIN age_with_default AS int DEFAULT 0 CHECK (value > 0);
CREATE DOMAIN age_with_default2 AS int CHECK (value > 0) DEFAULT 0;
CREATE DOMAIN age_with_default3 AS int CHECK (value > 0) DEFAULT NULL;
CREATE DOMAIN age_with_default4 AS int NOT NULL CHECK (value > 0) DEFAULT NULL;
-- test casting with worker queries
-- should simply work, has no check constraints
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.with_default;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.with_default;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.with_default;
$$) ORDER BY 1,2;
-- has a constraint where the number needs to be greater than 0
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default;
$$) ORDER BY 1,2;
-- has a constraint where the number needs to be greater than 0
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default2;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default2;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default2;
$$) ORDER BY 1,2;
-- has a constraint where the number needs to be greater than 0
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default3;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default3;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default3;
$$) ORDER BY 1,2;
-- has a constraint where the number needs to be greater than 0 and not null
SELECT * FROM run_command_on_workers($$
SELECT NULL::distributed_domain_constraints.age_with_default4;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 0::distributed_domain_constraints.age_with_default4;
$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$
SELECT 1::distributed_domain_constraints.age_with_default4;
$$) ORDER BY 1,2;
-- test usage in distributed tables
-- we use all domains defined earlier and insert all possible values, NULL, default, non-violation, violation. Some of
-- the default values will violate a constraint.
-- Based on the constraints on the domain we see different errors and will end up with a final set of data stored.
CREATE TABLE use_default (a int, b with_default);
SELECT create_distributed_table('use_default', 'a', shard_count => 2);
INSERT INTO use_default (a, b) VALUES (0, NULL);
INSERT INTO use_default (a) VALUES (1);
INSERT INTO use_default (a, b) VALUES (2, 1);
INSERT INTO use_default (a, b) VALUES (3, -1);
SELECT * FROM use_default ORDER BY a;
CREATE TABLE use_age_default (a int, b age_with_default);
SELECT create_distributed_table('use_age_default', 'a', shard_count => 2);
INSERT INTO use_age_default (a, b) VALUES (0, NULL);
INSERT INTO use_age_default (a) VALUES (1);
INSERT INTO use_age_default (a, b) VALUES (2, 1);
INSERT INTO use_age_default (a, b) VALUES (3, -1);
-- also load some data with copy to verify coercions.
\COPY use_age_default FROM STDIN DELIMITER AS ',';
4, -1
\.
\COPY use_age_default FROM STDIN DELIMITER AS ',';
5, 1
\.
SELECT * FROM use_age_default ORDER BY a;
CREATE TABLE use_age_default2 (a int, b age_with_default2);
SELECT create_distributed_table('use_age_default2', 'a', shard_count => 2);
INSERT INTO use_age_default2 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default2 (a) VALUES (1);
INSERT INTO use_age_default2 (a, b) VALUES (2, 1);
INSERT INTO use_age_default2 (a, b) VALUES (3, -1);
SELECT * FROM use_age_default2 ORDER BY a;
CREATE TABLE use_age_default3 (a int, b age_with_default3);
SELECT create_distributed_table('use_age_default3', 'a', shard_count => 2);
INSERT INTO use_age_default3 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default3 (a) VALUES (1);
INSERT INTO use_age_default3 (a, b) VALUES (2, 1);
INSERT INTO use_age_default3 (a, b) VALUES (3, -1);
SELECT * FROM use_age_default3 ORDER BY a;
CREATE TABLE use_age_default4 (a int, b age_with_default4);
SELECT create_distributed_table('use_age_default4', 'a', shard_count => 2);
INSERT INTO use_age_default4 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default4 (a) VALUES (1);
INSERT INTO use_age_default4 (a, b) VALUES (2, 1);
INSERT INTO use_age_default4 (a, b) VALUES (3, -1);
SELECT * FROM use_age_default4 ORDER BY a;
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain_constraints CASCADE;
RESET client_min_messages;
-- same tests as above, with with just in time propagation of domains
CREATE SCHEMA distributed_domain_constraints;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN with_default AS int DEFAULT 0;
CREATE DOMAIN age_with_default AS int DEFAULT 0 CHECK (value > 0);
CREATE DOMAIN age_with_default2 AS int CHECK (value > 0) DEFAULT 0;
CREATE DOMAIN age_with_default3 AS int CHECK (value > 0) DEFAULT NULL;
CREATE DOMAIN age_with_default4 AS int NOT NULL CHECK (value > 0) DEFAULT NULL;
RESET citus.enable_ddl_propagation;
-- use all domains in tables to get them propagated
CREATE TABLE use_default (a int, b with_default);
SELECT create_distributed_table('use_default', 'a', shard_count => 2);
INSERT INTO use_default (a, b) VALUES (0, NULL);
INSERT INTO use_default (a) VALUES (1);
INSERT INTO use_default (a, b) VALUES (2, 1);
INSERT INTO use_default (a, b) VALUES (3, -1);
SELECT * FROM use_default ORDER BY a;
CREATE TABLE use_age_default (a int, b age_with_default);
SELECT create_distributed_table('use_age_default', 'a', shard_count => 2);
INSERT INTO use_age_default (a, b) VALUES (0, NULL);
INSERT INTO use_age_default (a) VALUES (1);
INSERT INTO use_age_default (a, b) VALUES (2, 1);
INSERT INTO use_age_default (a, b) VALUES (3, -1);
SELECT * FROM use_age_default ORDER BY a;
CREATE TABLE use_age_default2 (a int, b age_with_default2);
SELECT create_distributed_table('use_age_default2', 'a', shard_count => 2);
INSERT INTO use_age_default2 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default2 (a) VALUES (1);
INSERT INTO use_age_default2 (a, b) VALUES (2, 1);
INSERT INTO use_age_default2 (a, b) VALUES (3, -1);
SELECT * FROM use_age_default2 ORDER BY a;
CREATE TABLE use_age_default3 (a int, b age_with_default3);
SELECT create_distributed_table('use_age_default3', 'a', shard_count => 2);
INSERT INTO use_age_default3 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default3 (a) VALUES (1);
INSERT INTO use_age_default3 (a, b) VALUES (2, 1);
INSERT INTO use_age_default3 (a, b) VALUES (3, -1);
SELECT * FROM use_age_default3 ORDER BY a;
CREATE TABLE use_age_default4 (a int, b age_with_default4);
SELECT create_distributed_table('use_age_default4', 'a', shard_count => 2);
INSERT INTO use_age_default4 (a, b) VALUES (0, NULL);
INSERT INTO use_age_default4 (a) VALUES (1);
INSERT INTO use_age_default4 (a, b) VALUES (2, 1);
INSERT INTO use_age_default4 (a, b) VALUES (3, -1);
SELECT * FROM use_age_default4 ORDER BY a;
-- clean up
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain_constraints CASCADE;
RESET client_min_messages;
CREATE SCHEMA postgres_domain_examples;
SET search_path TO postgres_domain_examples;
-- make sure the function gets automatically propagated when we propagate the domain
SET citus.enable_ddl_propagation TO off;
create function sql_is_distinct_from(anyelement, anyelement)
returns boolean language sql
as 'select $1 is distinct from $2 limit 1';
RESET citus.enable_ddl_propagation;
CREATE DOMAIN inotnull int
CHECK (sql_is_distinct_from(value, null));
SELECT * FROM run_command_on_workers($$ SELECT 1::postgres_domain_examples.inotnull; $$);
SELECT * FROM run_command_on_workers($$ SELECT null::postgres_domain_examples.inotnull; $$);
-- create a domain with sql function as a default value
SET citus.enable_ddl_propagation TO off;
create function random_between(min int, max int)
returns int language sql
as 'SELECT round(random()*($2-$1))+$1;';
RESET citus.enable_ddl_propagation;
-- this verifies the function in the default expression is found and distributed, otherwise the creation of the domain would fail on the workers.
CREATE DOMAIN with_random_default int DEFAULT random_between(100, 200);
SET client_min_messages TO warning;
DROP SCHEMA postgres_domain_examples CASCADE;
RESET client_min_messages;
SET search_path TO distributed_domain;
-- verify drops are propagated
CREATE DOMAIN will_drop AS text DEFAULT 'foo';
DROP DOMAIN will_drop;
-- verify domain is dropped from workers
SELECT * FROM run_command_on_workers($$ SELECT 'dropped?'::distributed_domain.will_drop; $$);
-- verify the type modifiers are deparsed correctly, both for direct propagation as well as on demand propagation
CREATE DOMAIN varcharmod AS varchar(3);
SELECT * FROM run_command_on_workers($$ SELECT '12345'::distributed_domain.varcharmod; $$) ORDER BY 1,2;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN varcharmod_ondemand AS varchar(3);
RESET citus.enable_ddl_propagation;
CREATE TABLE use_varcharmod_ondemand (a int, b varcharmod_ondemand);
SELECT create_distributed_table('use_varcharmod_ondemand', 'a');
-- epxected error due to value being too long for varchar(3)
INSERT INTO use_varcharmod_ondemand VALUES (1,'12345');
SELECT * FROM run_command_on_workers($$ SELECT '12345'::distributed_domain.varcharmod_ondemand; $$) ORDER BY 1,2;
-- section testing default altering
CREATE DOMAIN alter_default AS text DEFAULT 'foo';
CREATE TABLE use_alter_default (a int, b alter_default);
SELECT create_distributed_table('use_alter_default', 'a', shard_count => 4);
INSERT INTO use_alter_default (a) VALUES (1);
ALTER DOMAIN alter_default SET DEFAULT 'bar';
INSERT INTO use_alter_default (a) VALUES (2);
ALTER DOMAIN alter_default DROP DEFAULT;
INSERT INTO use_alter_default (a) VALUES (3);
SELECT * FROM use_alter_default ORDER BY 1,2;
-- add new dependency while adding default
CREATE DOMAIN add_default_with_function AS int;
SET citus.enable_ddl_propagation TO off;
create function random_between(min int, max int)
returns int language sql
as 'SELECT round(random()*($2-$1))+$1;';
RESET citus.enable_ddl_propagation;
ALTER DOMAIN add_default_with_function SET DEFAULT random_between(100, 200);
CREATE TABLE use_add_default_with_function (a int, b add_default_with_function);
SELECT create_distributed_table('use_add_default_with_function', 'a', shard_count => 4);
INSERT INTO use_add_default_with_function (a) VALUES (1);
-- altering NULL/NOT NULL
CREATE DOMAIN alter_null AS int;
CREATE TABLE use_alter_null (a int, b alter_null);
SELECT create_distributed_table('use_alter_null', 'a');
INSERT INTO use_alter_null (a) VALUES (1);
ALTER DOMAIN alter_null SET NOT NULL;
TRUNCATE use_alter_null;
ALTER DOMAIN alter_null SET NOT NULL;
INSERT INTO use_alter_null (a) VALUES (2);
ALTER DOMAIN alter_null DROP NOT NULL;
INSERT INTO use_alter_null (a) VALUES (3);
SELECT * FROM use_alter_null ORDER BY 1;
-- testing adding/dropping constraints
SET citus.enable_ddl_propagation TO off;
create function sql_is_distinct_from(anyelement, anyelement)
returns boolean language sql
as 'select $1 is distinct from $2 limit 1';
RESET citus.enable_ddl_propagation;
CREATE DOMAIN alter_add_constraint int;
ALTER DOMAIN alter_add_constraint ADD CONSTRAINT check_distinct CHECK (sql_is_distinct_from(value, null));
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.alter_add_constraint; $$);
SELECT * FROM run_command_on_workers($$ SELECT null::distributed_domain.alter_add_constraint; $$);
ALTER DOMAIN alter_add_constraint DROP CONSTRAINT check_distinct;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.alter_add_constraint; $$);
SELECT * FROM run_command_on_workers($$ SELECT null::distributed_domain.alter_add_constraint; $$);
ALTER DOMAIN alter_add_constraint DROP CONSTRAINT IF EXISTS check_distinct;
ALTER DOMAIN alter_add_constraint ADD CONSTRAINT check_distinct CHECK (sql_is_distinct_from(value, null));
ALTER DOMAIN alter_add_constraint RENAME CONSTRAINT check_distinct TO check_distinct_renamed;
ALTER DOMAIN alter_add_constraint DROP CONSTRAINT check_distinct_renamed;
-- test validating invalid constraints
CREATE DOMAIN age_invalid AS int NOT NULL DEFAULT 0;
CREATE TABLE use_age_invalid (a int, b age_invalid);
SELECT create_distributed_table('use_age_invalid', 'a', shard_count => 4);
INSERT INTO use_age_invalid VALUES (1,1), (2, 2), (3, 0), (4, -1);
ALTER DOMAIN age_invalid ADD CONSTRAINT check_age_positive CHECK (value>=0) NOT VALID;
-- should fail, even though constraint is not valid
INSERT INTO use_age_invalid VALUES (5,-1);
-- reading violating data of an non-valid constraint errors in citus
SELECT * FROM use_age_invalid ORDER BY 1;
-- should fail since there is data in the table that violates the check
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
DELETE FROM use_age_invalid WHERE b < 0;
-- should succeed now since the violating data has been removed
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
-- still fails for constraint
INSERT INTO use_age_invalid VALUES (5,-1);
SELECT * FROM use_age_invalid ORDER BY 1;
-- verify we can validate a constraint that is already validated, can happen when we add a node while a domain constraint was not validated
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
-- test changing the owner of a domain
SET client_min_messages TO error;
SELECT 1 FROM run_command_on_workers($$ CREATE ROLE domain_owner; $$);
CREATE ROLE domain_owner;
RESET client_min_messages;
CREATE DOMAIN alter_domain_owner AS int;
ALTER DOMAIN alter_domain_owner OWNER TO domain_owner;
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
SELECT * FROM run_command_on_workers($$
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
$$) ORDER BY 1,2;
DROP DOMAIN alter_domain_owner;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN alter_domain_owner AS int;
ALTER DOMAIN alter_domain_owner OWNER TO domain_owner;
RESET citus.enable_ddl_propagation;
CREATE TABLE use_alter_domain_owner (a int, b alter_domain_owner);
SELECT create_distributed_table('use_alter_domain_owner', 'a', shard_count => 4);
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
SELECT * FROM run_command_on_workers($$
SELECT u.rolname
FROM pg_type t
JOIN pg_roles u
ON (t.typowner = u.oid)
WHERE t.oid = 'distributed_domain.alter_domain_owner'::regtype;
$$) ORDER BY 1,2;
-- rename the domain
ALTER DOMAIN alter_domain_owner RENAME TO renamed_domain;
SELECT * FROM run_command_on_workers($$ SELECT NULL::distributed_domain.renamed_domain; $$) ORDER BY 1,2;
-- move schema
SET citus.enable_ddl_propagation TO off;
CREATE SCHEMA distributed_domain_moved;
RESET citus.enable_ddl_propagation;
ALTER DOMAIN renamed_domain SET SCHEMA distributed_domain_moved;
-- test collation
CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
CREATE DOMAIN with_collation AS text COLLATE german_phonebook NOT NULL;
SELECT run_command_on_workers($$ SELECT typcollation::regcollation FROM pg_type WHERE oid = 'distributed_domain.with_collation'::regtype; $$);
DROP DOMAIN with_collation;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN with_collation AS text COLLATE german_phonebook NOT NULL;
RESET citus.enable_ddl_propagation;
CREATE TABLE use_with_collation (a int, b with_collation);
SELECT create_reference_table('use_with_collation');
SELECT run_command_on_workers($$ SELECT typcollation::regcollation FROM pg_type WHERE oid = 'distributed_domain.with_collation'::regtype; $$);
INSERT INTO use_with_collation VALUES (1, U&'\00E4sop'), (2, 'Vossr');
SELECT * FROM use_with_collation WHERE b < 'b';
-- test domain backed by array
CREATE DOMAIN domain_array AS int[] NOT NULL CHECK (array_length(value,1) >= 2);
SELECT * FROM run_command_on_workers($$ SELECT NULL::distributed_domain.domain_array; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT ARRAY[1]::distributed_domain.domain_array; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT ARRAY[1,2]::distributed_domain.domain_array; $$) ORDER BY 1,2;
DROP DOMAIN domain_array;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN domain_array AS int[] NOT NULL CHECK (array_length(value,1) >= 2);
RESET citus.enable_ddl_propagation;
CREATE TABLE use_domain_array (a int, b domain_array);
SELECT create_distributed_table('use_domain_array', 'a');
INSERT INTO use_domain_array VALUES (1, NULL);
INSERT INTO use_domain_array VALUES (2, ARRAY[1]);
INSERT INTO use_domain_array VALUES (3, ARRAY[1,2]);
SELECT * FROM use_domain_array ORDER BY 1;
-- add nameless constraint
CREATE DOMAIN nameless_constraint AS int;
ALTER DOMAIN nameless_constraint ADD CHECK (value > 0);
SELECT * FROM run_command_on_workers($$ SELECT NULL::distributed_domain.nameless_constraint; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.nameless_constraint; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT (-1)::distributed_domain.nameless_constraint; $$) ORDER BY 1,2;
-- Test domains over domains
create domain vchar4 varchar(4);
create domain dinter vchar4 check (substring(VALUE, 1, 1) = 'x');
create domain dtop dinter check (substring(VALUE, 2, 1) = '1');
create table dtest(f1 dtop, id bigserial);
SELECT create_distributed_table('dtest', 'id');
insert into dtest values('x123');
insert into dtest values('123');
insert into dtest values('x234');
DROP TABLE dtest;
DROP DOMAIN IF EXISTS dtop;
DROP DOMAIN vchar4;
DROP DOMAIN vchar4 CASCADE;
-- drop multiple domains at once, for which one is not distributed
CREATE DOMAIN domain1 AS int;
CREATE DOMAIN domain2 AS text;
SET citus.enable_ddl_propagation TO off;
CREATE DOMAIN domain3 AS text;
RESET citus.enable_ddl_propagation;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.domain1; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain2; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain3; $$) ORDER BY 1,2;
DROP DOMAIN domain1, domain2, domain3;
SELECT * FROM run_command_on_workers($$ SELECT 1::distributed_domain.domain1; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain2; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT '1'::distributed_domain.domain3; $$) ORDER BY 1,2;
SET client_min_messages TO warning;
DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE;

View File

@ -260,20 +260,8 @@ UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2',
CREATE TYPE two_ints as (if1 int, if2 int);
CREATE DOMAIN domain AS two_ints CHECK ((VALUE).if1 > 0);
-- citus does not propagate domain objects
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below.
SELECT run_command_on_workers(
$$
CREATE DOMAIN type_tests.domain AS type_tests.two_ints CHECK ((VALUE).if1 > 0);
$$);
CREATE TABLE domain_indirection_test (f1 int, f3 domain, domain_array domain[]);
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
SELECT create_distributed_table('domain_indirection_test', 'f1');
RESET citus.enable_metadata_sync;
-- not supported (field indirection to underlying composite type)
INSERT INTO domain_indirection_test (f1,f3.if1, f3.if2) VALUES (0, 1, 2);

View File

@ -547,22 +547,12 @@ DROP TABLE text_partition_column_table;
-- Domain type columns can give issues
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below.
SELECT run_command_on_workers($$
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$')
$$);
CREATE TABLE domain_partition_column_table (
key test_key NOT NULL,
value int
);
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
SELECT create_distributed_table('domain_partition_column_table', 'key');
RESET citus.enable_metadata_sync;
PREPARE prepared_coercion_to_domain_insert(text) AS
INSERT INTO domain_partition_column_table VALUES ($1, 1);

View File

@ -14,21 +14,16 @@ EXECUTE prepared_relabel_insert('test');
SELECT key, value FROM text_partition_column_table ORDER BY key;
PREPARE prepared_coercion_to_domain_insert(text) AS
INSERT INTO domain_partition_column_table VALUES ($1, 1);
-- TODO: Uncomment tests below once domains are supported
-- PREPARE prepared_coercion_to_domain_insert(text) AS
-- INSERT INTO domain_partition_column_table VALUES ($1, 1);
-- EXECUTE prepared_coercion_to_domain_insert('test-1');
-- EXECUTE prepared_coercion_to_domain_insert('test-2');
-- EXECUTE prepared_coercion_to_domain_insert('test-3');
-- EXECUTE prepared_coercion_to_domain_insert('test-4');
-- EXECUTE prepared_coercion_to_domain_insert('test-5');
-- EXECUTE prepared_coercion_to_domain_insert('test-6');
-- EXECUTE prepared_coercion_to_domain_insert('test-7');
EXECUTE prepared_coercion_to_domain_insert('test-1');
EXECUTE prepared_coercion_to_domain_insert('test-2');
EXECUTE prepared_coercion_to_domain_insert('test-3');
EXECUTE prepared_coercion_to_domain_insert('test-4');
EXECUTE prepared_coercion_to_domain_insert('test-5');
EXECUTE prepared_coercion_to_domain_insert('test-6');
EXECUTE prepared_coercion_to_domain_insert('test-7');
PREPARE FOO AS INSERT INTO http_request (
site_id, ingest_time, url, request_country,

View File

@ -54,24 +54,12 @@ SELECT create_distributed_table('text_partition_column_table', 'key');
-- and we use offset to prevent output diverging
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
-- TODO: Once domains are supported, remove enable_metadata_sync off/on change
-- on dependent table distribution below. Also uncomment related tests on
-- prepared_statements_4 test file.
SELECT run_command_on_workers($$
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$')
$$) OFFSET 10000;
-- Disable metadata sync since citus doesn't support distributing
-- domains for now.
SET citus.enable_metadata_sync TO OFF;
CREATE TABLE domain_partition_column_table (
key test_key NOT NULL,
value int
);
SELECT create_distributed_table('domain_partition_column_table', 'key');
RESET citus.enable_metadata_sync;
-- verify we re-evaluate volatile functions every time
CREATE TABLE http_request (