diff --git a/src/backend/distributed/commands/create_citus_local_table.c b/src/backend/distributed/commands/create_citus_local_table.c new file mode 100644 index 000000000..ea16bf390 --- /dev/null +++ b/src/backend/distributed/commands/create_citus_local_table.c @@ -0,0 +1,890 @@ +/*------------------------------------------------------------------------- + * + * create_citus_local_table.c + * + * This file contains functions to create citus local tables. + * + * A citus local table is composed of a shell relation to wrap the + * the regular postgres relation as its coordinator local shard. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/htup_details.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_trigger.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/colocation_utils.h" +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/listutils.h" +#include "distributed/metadata_sync.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/namespace_utils.h" +#include "distributed/reference_table_utils.h" +#include "distributed/worker_protocol.h" +#include "distributed/worker_shard_visibility.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/ruleutils.h" +#include "utils/syscache.h" + + +static void CreateCitusLocalTable(Oid relationId); +static void ErrorIfUnsupportedCreateCitusLocalTable(Relation relation); +static void ErrorIfUnsupportedCitusLocalTableKind(Oid relationId); +static List * GetShellTableDDLEventsForCitusLocalTable(Oid relationId); +static uint64 ConvertLocalTableToShard(Oid relationId); +static void RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId); +static void RenameShardRelationConstraints(Oid shardRelationId, uint64 shardId); +static List * GetConstraintNameList(Oid relationId); +static char * GetRenameShardConstraintCommand(Oid relationId, char *constraintName, + uint64 shardId); +static void RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId); +static char * GetDropTriggerCommand(Oid relationId, char *triggerName); +static char * GetRenameShardIndexCommand(char *indexName, uint64 shardId); +static void RenameShardRelationNonTruncateTriggers(Oid shardRelationId, uint64 shardId); +static char * GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerName, + uint64 shardId); +static void ExecuteAndLogDDLCommandList(List *ddlCommandList); +static void ExecuteAndLogDDLCommand(const char *commandString); +static void DropRelationTruncateTriggers(Oid relationId); +static char * GetDropTriggerCommand(Oid relationId, char *triggerName); +static List * GetExplicitIndexNameList(Oid relationId); +static void DropAndMoveDefaultSequenceOwnerships(Oid sourceRelationId, + Oid targetRelationId); +static void ExtractColumnsOwningSequences(Oid relationId, List **columnNameList, + List **ownedSequenceIdList); +static void DropDefaultColumnDefinition(Oid relationId, char *columnName); +static void TransferSequenceOwnership(Oid ownedSequenceId, Oid targetRelationId, + char *columnName); +static void InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId); +static void FinalizeCitusLocalTableCreation(Oid relationId); + + +PG_FUNCTION_INFO_V1(create_citus_local_table); + + +/* + * create_citus_local_table creates a citus local table from the table with + * relationId by executing the internal method CreateCitusLocalTable. + * (See CreateCitusLocalTable function's comment.) + */ +Datum +create_citus_local_table(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Oid relationId = PG_GETARG_OID(0); + + CreateCitusLocalTable(relationId); + + PG_RETURN_VOID(); +} + + +/* + * CreateCitusLocalTable is the internal method that creates a citus table + * from the table with relationId. The created table would have the following + * properties: + * - it will have only one shard, + * - its distribution method will be DISTRIBUTE_BY_NONE, + * - its replication model will be ReplicationModel, + * - its replication factor will be set to 1. + * Similar to reference tables, it has only 1 placement. In addition to that, that + * single placement is only allowed to be on the coordinator. + */ +static void +CreateCitusLocalTable(Oid relationId) +{ + /* + * These checks should be done before acquiring any locks on relation. + * This is because we don't allow creating citus local tables in worker + * nodes and we don't want to acquire any locks on a table if we are not + * the owner of it. + */ + EnsureCoordinator(); + EnsureTableOwner(relationId); + + /* + * Lock target relation with an AccessExclusiveLock as we don't want + * multiple backends manipulating this relation. We could actually simply + * lock the relation without opening it. However, we also want to check + * if the relation does not exist or dropped by another backend. Also, + * we open the relation with try_relation_open instead of relation_open + * to give a nice error in case the table is dropped by another backend. + */ + Relation relation = try_relation_open(relationId, AccessExclusiveLock); + + ErrorIfUnsupportedCreateCitusLocalTable(relation); + + /* + * We immediately close relation with NoLock right after opening it. This is + * because, in this function, we may execute ALTER TABLE commands modifying + * relation's column definitions and postgres does not allow us to do so when + * the table is still open. (See the postgres function CheckTableNotInUse for + * more information) + */ + relation_close(relation, NoLock); + + + ObjectAddress tableAddress = { 0 }; + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + + /* + * Ensure dependencies first as we will create shell table on the other nodes + * in the MX case. + */ + EnsureDependenciesExistOnAllNodes(&tableAddress); + + /* + * Make sure that existing reference tables have been replicated to all + * the nodes such that we can create foreign keys and joins work + * immediately after creation. + */ + EnsureReferenceTablesExistOnAllNodes(); + + List *shellTableDDLEvents = GetShellTableDDLEventsForCitusLocalTable(relationId); + + char *relationName = get_rel_name(relationId); + Oid relationSchemaId = get_rel_namespace(relationId); + + /* below we convert relation with relationId to the shard relation */ + uint64 shardId = ConvertLocalTableToShard(relationId); + + /* + * As we retrieved the DDL commands necessary to create the shell table + * from scratch, below we simply recreate the shell table executing them + * via process utility. + */ + ExecuteAndLogDDLCommandList(shellTableDDLEvents); + + /* + * Set shellRelationId as the relation with relationId now points + * to the shard relation. + */ + Oid shardRelationId = relationId; + Oid shellRelationId = get_relname_relid(relationName, relationSchemaId); + + /* assert that we created the shell table properly in the same schema */ + Assert(OidIsValid(shellRelationId)); + + /* + * Move sequence ownerships from shard table to shell table and also drop + * DEFAULT expressions from shard relation as we should evaluate such columns + * in shell table when needed. + */ + DropAndMoveDefaultSequenceOwnerships(shardRelationId, shellRelationId); + + InsertMetadataForCitusLocalTable(shellRelationId, shardId); + + FinalizeCitusLocalTableCreation(shellRelationId); +} + + +/* + * ErrorIfUnsupportedCreateCitusLocalTable errors out if we cannot create the + * citus local table from the relation. + */ +static void +ErrorIfUnsupportedCreateCitusLocalTable(Relation relation) +{ + if (!RelationIsValid(relation)) + { + ereport(ERROR, (errmsg("cannot create citus local table, relation does " + "not exist"))); + } + + Oid relationId = relation->rd_id; + + ErrorIfCoordinatorNotAddedAsWorkerNode(); + ErrorIfUnsupportedCitusLocalTableKind(relationId); + EnsureTableNotDistributed(relationId); + + /* + * When creating other citus table types, we don't need to check that case as + * EnsureTableNotDistributed already errors out if the given relation implies + * a citus table. However, as we don't mark the relation as citus table, i.e we + * do not use the relation with relationId as the shell relation, parallel + * create_citus_local_table executions would not error out for that relation. + * Hence we need to error out for shard relations too. + */ + ErrorIfRelationIsAKnownShard(relationId); + + /* + * We do not allow creating citus local table if the table is involved in a + * foreign key relationship with "any other table". Note that we allow self + * references. + */ + ErrorIfTableHasExternalForeignKeys(relationId); + + /* we do not support policies in citus community */ + ErrorIfUnsupportedPolicy(relation); +} + + +/* + * ErrorIfUnsupportedCitusLocalTableKind errors out if the relation kind of + * relation with relationId is not supported for citus local table creation. + */ +static void +ErrorIfUnsupportedCitusLocalTableKind(Oid relationId) +{ + const char *relationName = get_rel_name(relationId); + + if (IsChildTable(relationId) || IsParentTable(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create citus local table \"%s\", citus local " + "tables cannot be involved in inheritance relationships", + relationName))); + } + + if (PartitionTable(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create citus local table \"%s\", citus local " + "tables cannot be partition of other tables", + relationName))); + } + + char relationKind = get_rel_relkind(relationId); + if (!(relationKind == RELKIND_RELATION || relationKind == RELKIND_FOREIGN_TABLE)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create citus local table \"%s\", only regular " + "tables and foreign tables are supported for citus local " + "table creation", relationName))); + } +} + + +/* + * GetShellTableDDLEventsForCitusLocalTable returns a list of DDL commands + * to create the shell table from scratch. + */ +static List * +GetShellTableDDLEventsForCitusLocalTable(Oid relationId) +{ + /* + * As we don't allow foreign keys with other tables initially, below we + * only pick self-referencing foreign keys. + */ + List *foreignConstraintCommands = + GetReferencingForeignConstaintCommands(relationId); + + /* + * Include DEFAULT clauses for columns getting their default values from + * a sequence. + */ + bool includeSequenceDefaults = true; + + List *shellTableDDLEvents = GetTableDDLEvents(relationId, + includeSequenceDefaults); + shellTableDDLEvents = list_concat(shellTableDDLEvents, foreignConstraintCommands); + + return shellTableDDLEvents; +} + + +/* + * ConvertLocalTableToShard first acquires a shardId and then converts the + * given relation with relationId to the shard relation with shardId. That + * means, this function suffixes shardId to: + * - relation name, + * - all the objects "defined on" the relation. + * After converting the given relation, returns the acquired shardId. + */ +static uint64 +ConvertLocalTableToShard(Oid relationId) +{ + uint64 shardId = GetNextShardId(); + + RenameRelationToShardRelation(relationId, shardId); + RenameShardRelationConstraints(relationId, shardId); + RenameShardRelationIndexes(relationId, shardId); + + /* + * We do not create truncate triggers on shard relation. This is + * because truncate triggers are fired by utility hook and we would + * need to disable them to prevent executing them twice if we don't + * drop the trigger on shard relation. + */ + DropRelationTruncateTriggers(relationId); + + /* + * We create INSERT|DELETE|UPDATE triggers on shard relation too. + * This is because citus prevents postgres executor to fire those + * triggers. So, here we suffix such triggers on shard relation + * with shardId. + */ + RenameShardRelationNonTruncateTriggers(relationId, shardId); + + return shardId; +} + + +/* + * RenameRelationToShardRelation appends given shardId to the end of the name + * of relation with shellRelationId. + */ +static void +RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId) +{ + char *qualifiedShellRelationName = generate_qualified_relation_name(shellRelationId); + + char *shellRelationName = get_rel_name(shellRelationId); + char *shardRelationName = pstrdup(shellRelationName); + AppendShardIdToName(&shardRelationName, shardId); + const char *quotedShardRelationName = quote_identifier(shardRelationName); + + StringInfo renameCommand = makeStringInfo(); + appendStringInfo(renameCommand, "ALTER TABLE %s RENAME TO %s;", + qualifiedShellRelationName, quotedShardRelationName); + + ExecuteAndLogDDLCommand(renameCommand->data); +} + + +/* + * RenameShardRelationConstraints appends given shardId to the end of the name + * of constraints "defined on" the relation with shardRelationId. This function + * utilizes GetConstraintNameList to pick the constraints to be renamed, + * see more details in function's comment. + */ +static void +RenameShardRelationConstraints(Oid shardRelationId, uint64 shardId) +{ + List *constraintNameList = GetConstraintNameList(shardRelationId); + + char *constraintName = NULL; + foreach_ptr(constraintName, constraintNameList) + { + const char *commandString = + GetRenameShardConstraintCommand(shardRelationId, constraintName, shardId); + ExecuteAndLogDDLCommand(commandString); + } +} + + +/* + * GetConstraintNameList returns a list of constraint names "defined on" the + * relation with relationId. Those constraints can be: + * - "check" constraints or, + * - "primary key" constraints or, + * - "unique" constraints or, + * - "trigger" constraints or, + * - "exclusion" constraints or, + * - "foreign key" constraints in which the relation is the "referencing" + * relation (including the self-referencing foreign keys). + */ +static List * +GetConstraintNameList(Oid relationId) +{ + List *constraintNameList = NIL; + + int scanKeyCount = 1; + ScanKeyData scanKey[1]; + + Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, relationId); + + bool useIndex = true; + SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, + ConstraintRelidTypidNameIndexId, + useIndex, NULL, scanKeyCount, + scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); + + char *constraintName = NameStr(constraintForm->conname); + constraintNameList = lappend(constraintNameList, pstrdup(constraintName)); + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(pgConstraint, NoLock); + + return constraintNameList; +} + + +/* + * GetRenameShardConstraintCommand returns DDL command to append given + * shardId to the constrant with constraintName on the relation with + * relationId. + */ +static char * +GetRenameShardConstraintCommand(Oid relationId, char *constraintName, uint64 shardId) +{ + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + + char *shardConstraintName = pstrdup(constraintName); + AppendShardIdToName(&shardConstraintName, shardId); + const char *quotedShardConstraintName = quote_identifier(shardConstraintName); + + const char *quotedConstraintName = quote_identifier(constraintName); + + StringInfo renameCommand = makeStringInfo(); + appendStringInfo(renameCommand, "ALTER TABLE %s RENAME CONSTRAINT %s TO %s;", + qualifiedRelationName, quotedConstraintName, + quotedShardConstraintName); + + return renameCommand->data; +} + + +/* + * RenameShardRelationIndexes appends given shardId to the end of the names + * of shard relation indexes except the ones that are already renamed via + * RenameShardRelationConstraints. This function utilizes + * GetExplicitIndexNameList to pick the indexes to be renamed, see more + * details in function's comment. + */ +static void +RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId) +{ + List *indexNameList = GetExplicitIndexNameList(shardRelationId); + + char *indexName = NULL; + foreach_ptr(indexName, indexNameList) + { + const char *commandString = GetRenameShardIndexCommand(indexName, shardId); + ExecuteAndLogDDLCommand(commandString); + } +} + + +/* + * GetRenameShardIndexCommand returns DDL command to append given shardId to + * the index with indexName. + */ +static char * +GetRenameShardIndexCommand(char *indexName, uint64 shardId) +{ + char *shardIndexName = pstrdup(indexName); + AppendShardIdToName(&shardIndexName, shardId); + const char *quotedShardIndexName = quote_identifier(shardIndexName); + + const char *quotedIndexName = quote_identifier(indexName); + + StringInfo renameCommand = makeStringInfo(); + appendStringInfo(renameCommand, "ALTER INDEX %s RENAME TO %s;", + quotedIndexName, quotedShardIndexName); + + return renameCommand->data; +} + + +/* + * RenameShardRelationNonTruncateTriggers appends given shardId to the end of + * the names of shard relation INSERT/DELETE/UPDATE triggers that are explicitly + * created. + */ +static void +RenameShardRelationNonTruncateTriggers(Oid shardRelationId, uint64 shardId) +{ + List *triggerIdList = GetExplicitTriggerIdList(shardRelationId); + + Oid triggerId = InvalidOid; + foreach_oid(triggerId, triggerIdList) + { + bool missingOk = false; + HeapTuple triggerTuple = GetTriggerTupleById(triggerId, missingOk); + Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(triggerTuple); + + if (!TRIGGER_FOR_TRUNCATE(triggerForm->tgtype)) + { + char *triggerName = NameStr(triggerForm->tgname); + char *commandString = + GetRenameShardTriggerCommand(shardRelationId, triggerName, shardId); + ExecuteAndLogDDLCommand(commandString); + } + + heap_freetuple(triggerTuple); + } +} + + +/* + * GetRenameShardTriggerCommand returns DDL command to append given shardId to + * the trigger with triggerName. + */ +static char * +GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerName, uint64 shardId) +{ + char *qualifiedShardRelationName = generate_qualified_relation_name(shardRelationId); + + char *shardTriggerName = pstrdup(triggerName); + AppendShardIdToName(&shardTriggerName, shardId); + const char *quotedShardTriggerName = quote_identifier(shardTriggerName); + + const char *quotedTriggerName = quote_identifier(triggerName); + + StringInfo renameCommand = makeStringInfo(); + appendStringInfo(renameCommand, "ALTER TRIGGER %s ON %s RENAME TO %s;", + quotedTriggerName, qualifiedShardRelationName, + quotedShardTriggerName); + + return renameCommand->data; +} + + +/* + * ExecuteAndLogDDLCommandList takes a list of ddl commands and calls + * ExecuteAndLogDDLCommand function for each of them. + */ +static void +ExecuteAndLogDDLCommandList(List *ddlCommandList) +{ + char *ddlCommand = NULL; + foreach_ptr(ddlCommand, ddlCommandList) + { + ExecuteAndLogDDLCommand(ddlCommand); + } +} + + +/* + * ExecuteAndLogDDLCommand takes a ddl command and logs it in DEBUG4 log level. + * Then, parses and executes it via CitusProcessUtility. + */ +static void +ExecuteAndLogDDLCommand(const char *commandString) +{ + ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); + + Node *parseTree = ParseTreeNode(commandString); + CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); +} + + +/* + * DropRelationTruncateTriggers drops TRUNCATE triggers that are explicitly + * created on relation with relationId. + */ +static void +DropRelationTruncateTriggers(Oid relationId) +{ + List *triggerIdList = GetExplicitTriggerIdList(relationId); + + Oid triggerId = InvalidOid; + foreach_oid(triggerId, triggerIdList) + { + bool missingOk = false; + HeapTuple triggerTuple = GetTriggerTupleById(triggerId, missingOk); + Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(triggerTuple); + + if (TRIGGER_FOR_TRUNCATE(triggerForm->tgtype)) + { + char *triggerName = NameStr(triggerForm->tgname); + char *commandString = GetDropTriggerCommand(relationId, triggerName); + ExecuteAndLogDDLCommand(commandString); + } + + heap_freetuple(triggerTuple); + } +} + + +/* + * GetDropTriggerCommand returns DDL command to drop the trigger with triggerName + * on relationId. + */ +static char * +GetDropTriggerCommand(Oid relationId, char *triggerName) +{ + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + const char *quotedTriggerName = quote_identifier(triggerName); + + /* + * In postgres, the only possible object type that may depend on a trigger + * is the "constraint" object implied by the trigger itself if it is a + * constraint trigger, and it would be an internal dependency so it could + * be dropped without using CASCADE. Other than this, it is also possible + * to define dependencies on trigger via recordDependencyOn api by other + * extensions. We don't handle those kind of dependencies, we just drop + * them with CASCADE. + */ + StringInfo dropCommand = makeStringInfo(); + appendStringInfo(dropCommand, "DROP TRIGGER %s ON %s CASCADE;", + quotedTriggerName, qualifiedRelationName); + + return dropCommand->data; +} + + +/* + * GetExplicitIndexNameList returns a list of index names defined "explicitly" + * on the relation with relationId by the "CREATE INDEX" commands. That means, + * all the constraints defined on the relation except: + * - primary indexes, + * - unique indexes and + * - exclusion indexes + * that are actually applied by the related constraints. + */ +static List * +GetExplicitIndexNameList(Oid relationId) +{ + int scanKeyCount = 1; + ScanKeyData scanKey[1]; + + PushOverrideEmptySearchPath(CurrentMemoryContext); + + Relation pgIndex = table_open(IndexRelationId, AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_index_indrelid, + BTEqualStrategyNumber, F_OIDEQ, relationId); + + bool useIndex = true; + SysScanDesc scanDescriptor = systable_beginscan(pgIndex, IndexIndrelidIndexId, + useIndex, NULL, scanKeyCount, + scanKey); + + List *indexNameList = NIL; + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple); + + Oid indexId = indexForm->indexrelid; + + bool indexImpliedByConstraint = IndexImpliedByAConstraint(indexForm); + + /* + * Skip the indexes that are not implied by explicitly executing + * a CREATE INDEX command. + */ + if (!indexImpliedByConstraint) + { + char *indexName = get_rel_name(indexId); + + indexNameList = lappend(indexNameList, pstrdup(indexName)); + } + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(pgIndex, NoLock); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + + return indexNameList; +} + + +/* + * DropAndMoveDefaultSequenceOwnerships drops default column definitions for + * relation with sourceRelationId. Also, for each column that defaults to an + * owned sequence, it grants ownership to the same named column of the relation + * with targetRelationId. + */ +static void +DropAndMoveDefaultSequenceOwnerships(Oid sourceRelationId, Oid targetRelationId) +{ + List *columnNameList = NIL; + List *ownedSequenceIdList = NIL; + ExtractColumnsOwningSequences(sourceRelationId, &columnNameList, + &ownedSequenceIdList); + + ListCell *columnNameCell = NULL; + ListCell *ownedSequenceIdCell = NULL; + forboth(columnNameCell, columnNameList, ownedSequenceIdCell, ownedSequenceIdList) + { + char *columnName = (char *) lfirst(columnNameCell); + Oid ownedSequenceId = lfirst_oid(ownedSequenceIdCell); + + DropDefaultColumnDefinition(sourceRelationId, columnName); + + /* column might not own a sequence */ + if (OidIsValid(ownedSequenceId)) + { + TransferSequenceOwnership(ownedSequenceId, targetRelationId, columnName); + } + } +} + + +/* + * ExtractColumnsOwningSequences finds each column of relation with relationId + * defaulting to an owned sequence. Then, appends the column name and id of the + * owned sequence -that the column defaults- to the lists passed as NIL initially. + */ +static void +ExtractColumnsOwningSequences(Oid relationId, List **columnNameList, + List **ownedSequenceIdList) +{ + Assert(*columnNameList == NIL && *ownedSequenceIdList == NIL); + + Relation relation = relation_open(relationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + + for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; + attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); + if (attributeForm->attisdropped || !attributeForm->atthasdef) + { + /* + * If this column has already been dropped or it has no DEFAULT + * definition, skip it. + */ + continue; + } + + char *columnName = NameStr(attributeForm->attname); + *columnNameList = lappend(*columnNameList, columnName); + + List *columnOwnedSequences = + GetSequencesOwnedByColumn(relationId, attributeIndex + 1); + + Oid ownedSequenceId = InvalidOid; + if (list_length(columnOwnedSequences) != 0) + { + /* + * A column might only own one sequence. We intentionally use + * GetSequencesOwnedByColumn macro and pick initial oid from the + * list instead of using getOwnedSequence. This is both because + * getOwnedSequence is removed in pg13 and is also because it + * errors out if column does not have any sequences. + */ + Assert(list_length(columnOwnedSequences) == 1); + ownedSequenceId = linitial_oid(columnOwnedSequences); + } + + *ownedSequenceIdList = lappend_oid(*ownedSequenceIdList, ownedSequenceId); + } + + relation_close(relation, NoLock); +} + + +/* + * DropDefaultColumnDefinition drops the DEFAULT definiton of the column with + * columnName of the relation with relationId via process utility. + */ +static void +DropDefaultColumnDefinition(Oid relationId, char *columnName) +{ + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + const char *quotedColumnName = quote_identifier(columnName); + + StringInfo sequenceDropCommand = makeStringInfo(); + appendStringInfo(sequenceDropCommand, + "ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT", + qualifiedRelationName, quotedColumnName); + + ExecuteAndLogDDLCommand(sequenceDropCommand->data); +} + + +/* + * TransferSequenceOwnership grants ownership of the sequence with sequenceId + * to the column with targetColumnName of relation with targetRelationId via + * process utility. Note that this function assumes that the target relation + * has a column with targetColumnName which can default to the given sequence. + */ +static void +TransferSequenceOwnership(Oid sequenceId, Oid targetRelationId, char *targetColumnName) +{ + char *qualifiedSequenceName = generate_qualified_relation_name(sequenceId); + char *qualifiedTargetRelationName = + generate_qualified_relation_name(targetRelationId); + const char *quotedTargetColumnName = quote_identifier(targetColumnName); + + StringInfo sequenceOwnershipCommand = makeStringInfo(); + appendStringInfo(sequenceOwnershipCommand, "ALTER SEQUENCE %s OWNED BY %s.%s", + qualifiedSequenceName, qualifiedTargetRelationName, + quotedTargetColumnName); + + ExecuteAndLogDDLCommand(sequenceOwnershipCommand->data); +} + + +/* + * InsertMetadataForCitusLocalTable inserts necessary metadata for the citus + * local table to the following metadata tables: + * pg_dist_partition, pg_dist_shard & pg_dist_placement. + */ +static void +InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId) +{ + Assert(OidIsValid(citusLocalTableId)); + Assert(shardId != INVALID_SHARD_ID); + + char distributionMethod = DISTRIBUTE_BY_NONE; + char replicationModel = ReplicationModel; + + Assert(replicationModel != REPLICATION_MODEL_2PC); + + uint32 colocationId = INVALID_COLOCATION_ID; + Var *distributionColumn = NULL; + InsertIntoPgDistPartition(citusLocalTableId, distributionMethod, + distributionColumn, colocationId, + replicationModel); + + /* set shard storage type according to relation type */ + char shardStorageType = ShardStorageType(citusLocalTableId); + + text *shardMinValue = NULL; + text *shardMaxValue = NULL; + InsertShardRow(citusLocalTableId, shardId, shardStorageType, + shardMinValue, shardMaxValue); + + List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); + + int replicationFactor = 1; + int workerStartIndex = 0; + InsertShardPlacementRows(citusLocalTableId, shardId, nodeList, + workerStartIndex, replicationFactor); +} + + +/* + * FinalizeCitusLocalTableCreation completes creation of the citus local table + * with relationId by performing operations that should be done after creating + * the shard and inserting the metadata. + */ +static void +FinalizeCitusLocalTableCreation(Oid relationId) +{ + /* + * If it is a foreign table, then skip creating citus truncate trigger + * as foreign tables do not support truncate triggers. + */ + if (RegularTable(relationId)) + { + CreateTruncateTrigger(relationId); + } + + if (ShouldSyncTableMetadata(relationId)) + { + CreateTableMetadataOnWorkers(relationId); + } + + /* + * We've a custom way of foreign key graph invalidation, + * see InvalidateForeignKeyGraph(). + */ + if (TableReferenced(relationId) || TableReferencing(relationId)) + { + InvalidateForeignKeyGraph(); + } +} diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0d974fac1..d6c8c6678 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -103,7 +103,6 @@ static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, Oid distributionColumnType, Oid sourceRelationId); static void EnsureLocalTableEmpty(Oid relationId); -static void EnsureTableNotDistributed(Oid relationId); static void EnsureRelationHasNoTriggers(Oid relationId); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); @@ -398,7 +397,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, colocationId, replicationModel); - /* foreign tables does not support TRUNCATE trigger */ + /* foreign tables do not support TRUNCATE trigger */ if (RegularTable(relationId)) { CreateTruncateTrigger(relationId); @@ -424,7 +423,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio CreateReferenceTableShard(relationId); } - if (ShouldSyncTableMetadata(relationId)) { CreateTableMetadataOnWorkers(relationId); @@ -955,7 +953,7 @@ EnsureLocalTableEmpty(Oid relationId) /* * EnsureTableNotDistributed errors out if the table is distributed. */ -static void +void EnsureTableNotDistributed(Oid relationId) { char *relationName = get_rel_name(relationId); diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index dd28a03a4..4f923d552 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -36,22 +36,6 @@ #include "utils/ruleutils.h" #include "utils/syscache.h" -/* - * Flags that can be passed to GetForeignKeyOids to indicate - * which foreign key constraint OIDs are to be extracted - */ -typedef enum ExtractForeignKeyConstrainstMode -{ - /* extract the foreign key OIDs where the table is the referencing one */ - INCLUDE_REFERENCING_CONSTRAINTS = 1 << 0, - - /* extract the foreign key OIDs the table is the referenced one */ - INCLUDE_REFERENCED_CONSTRAINTS = 1 << 1, - - /* exclude the self-referencing foreign keys */ - EXCLUDE_SELF_REFERENCES = 1 << 2 -} ExtractForeignKeyConstraintMode; - /* Local functions forward declarations */ static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId, @@ -67,7 +51,6 @@ static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple, static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags); static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple); static List * GetForeignKeyOidsToReferenceTables(Oid relationId); -static List * GetForeignKeyOids(Oid relationId, int flags); /* * ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a @@ -674,13 +657,44 @@ FindForeignKeyOidWithName(List *foreignKeyOids, const char *inputConstraintName) } +/* + * ErrorIfTableHasExternalForeignKeys errors out if the relation with relationId + * is involved in a foreign key relationship other than the self-referencing ones. + */ +void +ErrorIfTableHasExternalForeignKeys(Oid relationId) +{ + int flags = (INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES); + List *foreignKeyIdsTableReferencing = GetForeignKeyOids(relationId, flags); + + flags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES); + List *foreignKeyIdsTableReferenced = GetForeignKeyOids(relationId, flags); + + List *foreignKeysWithOtherTables = list_concat(foreignKeyIdsTableReferencing, + foreignKeyIdsTableReferenced); + + if (list_length(foreignKeysWithOtherTables) == 0) + { + return; + } + + const char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("relation \"%s\" is involved in a foreign key relationship " + "with another table", relationName), + errhint("Drop foreign keys with other tables and re-define them " + "with ALTER TABLE commands after the current operation " + "is done."))); +} + + /* * GetForeignKeyOids takes in a relationId, and returns a list of OIDs for * foreign constraints that the relation with relationId is involved according - * to "flags" argument. See ExtractForeignKeyConstrainstMode enum definition + * to "flags" argument. See ExtractForeignKeyConstraintsMode enum definition * for usage of the flags. */ -static List * +List * GetForeignKeyOids(Oid relationId, int flags) { AttrNumber pgConstraintTargetAttrNumber = InvalidAttrNumber; diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index 6051fbc2d..639d1abdd 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -17,6 +17,7 @@ #else #include "access/heapam.h" #include "access/htup_details.h" +#include "access/sysattr.h" #endif #include "catalog/indexing.h" #include "catalog/namespace.h" @@ -28,6 +29,8 @@ #include "distributed/namespace_utils.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" + /* * GetExplicitTriggerCommandList returns the list of DDL commands to create @@ -59,6 +62,71 @@ GetExplicitTriggerCommandList(Oid relationId) } +/* + * GetExplicitTriggerNameList returns a list of trigger names that are explicitly + * created for the table with relationId. See comment of GetExplicitTriggerIdList + * function. + */ +List * +GetExplicitTriggerNameList(Oid relationId) +{ + List *triggerNameList = NIL; + + List *triggerIdList = GetExplicitTriggerIdList(relationId); + + Oid triggerId = InvalidOid; + foreach_oid(triggerId, triggerIdList) + { + char *triggerHame = GetTriggerNameById(triggerId); + triggerNameList = lappend(triggerNameList, triggerHame); + } + + return triggerNameList; +} + + +/* + * GetTriggerNameById returns name of the trigger identified by triggerId if it + * exists. Otherwise, returns NULL. + */ +char * +GetTriggerNameById(Oid triggerId) +{ + Relation pgTrigger = table_open(TriggerRelationId, AccessShareLock); + + int scanKeyCount = 1; + ScanKeyData scanKey[1]; + +#if PG_VERSION_NUM >= PG_VERSION_12 + AttrNumber attrNumber = Anum_pg_trigger_oid; +#else + AttrNumber attrNumber = ObjectIdAttributeNumber; +#endif + + ScanKeyInit(&scanKey[0], attrNumber, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(triggerId)); + + bool useIndex = true; + SysScanDesc scanDescriptor = systable_beginscan(pgTrigger, TriggerOidIndexId, + useIndex, NULL, scanKeyCount, + scanKey); + + char *triggerName = NULL; + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(heapTuple); + triggerName = pstrdup(NameStr(triggerForm->tgname)); + } + + systable_endscan(scanDescriptor); + table_close(pgTrigger, NoLock); + + return triggerName; +} + + /* * GetExplicitTriggerIdList returns a list of OIDs corresponding to the triggers * that are explicitly created on the relation with relationId. That means, diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 93c9b0dd3..34943544d 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -3809,16 +3809,22 @@ ReferenceTableOidList() while (HeapTupleIsValid(heapTuple)) { bool isNull = false; - Datum relationIdDatum = heap_getattr(heapTuple, - Anum_pg_dist_partition_logicalrelid, - tupleDescriptor, &isNull); - Oid relationId = DatumGetObjectId(relationIdDatum); char partitionMethod = heap_getattr(heapTuple, Anum_pg_dist_partition_partmethod, tupleDescriptor, &isNull); + char replicationModel = heap_getattr(heapTuple, + Anum_pg_dist_partition_repmodel, + tupleDescriptor, &isNull); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (partitionMethod == DISTRIBUTE_BY_NONE && + replicationModel == REPLICATION_MODEL_2PC) { + Datum relationIdDatum = heap_getattr(heapTuple, + Anum_pg_dist_partition_logicalrelid, + tupleDescriptor, &isNull); + + Oid relationId = DatumGetObjectId(relationIdDatum); + referenceTableOidList = lappend_oid(referenceTableOidList, relationId); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index cbb33ac05..30ebee3ec 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1036,7 +1036,7 @@ List * SequenceDDLCommandsForTable(Oid relationId) { List *sequenceDDLList = NIL; - List *ownedSequences = getOwnedSequencesCompat(relationId, InvalidAttrNumber); + List *ownedSequences = GetSequencesOwnedByRelation(relationId); char *ownerName = TableOwner(relationId); Oid sequenceOid = InvalidOid; diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 5bc2053b0..bb8312a3e 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -749,7 +749,7 @@ GetTableIndexAndConstraintCommands(Oid relationId) /* - * IndexImpliedByAConstraint is an helper function to be used while scanning + * IndexImpliedByAConstraint is a helper function to be used while scanning * pg_index. It returns true if the index identified by the given indexForm is * implied by a constraint. Note that caller is responsible for passing a valid * indexFrom, which means an alive heap tuple which is of form Form_pg_index. diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index bd83093cd..a9dfbca16 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -204,6 +204,14 @@ ErrorIfTableCannotBeReplicated(Oid relationId) CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); char *relationName = get_rel_name(relationId); + if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("Table %s is a citus local table. Replicating " + "shard of a citus local table currently is not " + "supported", quote_literal_cstr(relationName))))); + } + /* * ShouldSyncTableMetadata() returns true also for reference table, * we don't want to error in that case since reference tables aren't diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 2ce8bb947..c9efa92c2 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -15,6 +15,7 @@ #include "miscadmin.h" #include "commands/dbcommands.h" +#include "distributed/coordinator_protocol.h" #include "distributed/hash_helpers.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -404,6 +405,25 @@ NodeIsPrimaryWorker(WorkerNode *node) } +/* + * CoordinatorAddedAsWorkerNode returns true if coordinator is added to the + * pg_dist_node. This function also acquires RowExclusiveLock on pg_dist_node + * and does not release it to ensure that existency of the coordinator in + * metadata won't be changed until the end of transaction. + */ +bool +CoordinatorAddedAsWorkerNode() +{ + bool groupContainsNodes = false; + + LockRelationOid(DistNodeRelationId(), RowExclusiveLock); + + PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &groupContainsNodes); + + return groupContainsNodes; +} + + /* * ReferenceTablePlacementNodeList returns the set of nodes that should have * reference table placements. This includes all primaries, including the @@ -417,6 +437,45 @@ ReferenceTablePlacementNodeList(LOCKMODE lockMode) } +/* + * CoordinatorNodeIfAddedAsWorkerOrError returns the WorkerNode object for + * coordinator node if it is added to pg_dist_node, otherwise errors out. + * Also, as CoordinatorAddedAsWorkerNode acquires AccessShareLock on pg_dist_node + * and doesn't release it, callers can safely assume coordinator won't be + * removed from metadata until the end of transaction when this function + * returns coordinator node. + */ +WorkerNode * +CoordinatorNodeIfAddedAsWorkerOrError() +{ + ErrorIfCoordinatorNotAddedAsWorkerNode(); + + WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID); + + WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode)); + *coordinatorNodeCopy = *coordinatorNode; + + return coordinatorNodeCopy; +} + + +/* + * ErrorIfCoordinatorNotAddedAsWorkerNode errors out if coordinator is not added + * to metadata. + */ +void +ErrorIfCoordinatorNotAddedAsWorkerNode() +{ + if (CoordinatorAddedAsWorkerNode()) + { + return; + } + + ereport(ERROR, (errmsg("could not find the coordinator node in " + "metadata as it is not added as a worker"))); +} + + /* * DistributedTablePlacementNodeList returns a list of all active, primary * worker nodes that can store new data, i.e shouldstoreshards is 'true' diff --git a/src/backend/distributed/sql/citus--9.4-1--9.5-1.sql b/src/backend/distributed/sql/citus--9.4-1--9.5-1.sql index 995919765..bccfa5f68 100644 --- a/src/backend/distributed/sql/citus--9.4-1--9.5-1.sql +++ b/src/backend/distributed/sql/citus--9.4-1--9.5-1.sql @@ -2,6 +2,7 @@ -- bump version to 9.5-1 #include "udfs/undistribute_table/9.5-1.sql" +#include "udfs/create_citus_local_table/9.5-1.sql" SET search_path = 'pg_catalog'; diff --git a/src/backend/distributed/sql/downgrades/citus--9.5-1--9.4-1.sql b/src/backend/distributed/sql/downgrades/citus--9.5-1--9.4-1.sql index d7146cba9..5da7b5140 100644 --- a/src/backend/distributed/sql/downgrades/citus--9.5-1--9.4-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--9.5-1--9.4-1.sql @@ -2,6 +2,8 @@ SET search_path = 'pg_catalog'; +DROP FUNCTION create_citus_local_table(table_name regclass); + -- task_tracker_* functions CREATE FUNCTION task_tracker_assign_task(bigint, integer, text) diff --git a/src/backend/distributed/sql/udfs/create_citus_local_table/9.5-1.sql b/src/backend/distributed/sql/udfs/create_citus_local_table/9.5-1.sql new file mode 100644 index 000000000..081228799 --- /dev/null +++ b/src/backend/distributed/sql/udfs/create_citus_local_table/9.5-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.create_citus_local_table(table_name regclass) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_citus_local_table$$; +COMMENT ON FUNCTION pg_catalog.create_citus_local_table(table_name regclass) + IS 'create a citus local table'; diff --git a/src/backend/distributed/sql/udfs/create_citus_local_table/latest.sql b/src/backend/distributed/sql/udfs/create_citus_local_table/latest.sql new file mode 100644 index 000000000..081228799 --- /dev/null +++ b/src/backend/distributed/sql/udfs/create_citus_local_table/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.create_citus_local_table(table_name regclass) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_citus_local_table$$; +COMMENT ON FUNCTION pg_catalog.create_citus_local_table(table_name regclass) + IS 'create a citus local table'; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 09660f0f3..4c7b101e9 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -19,6 +19,7 @@ #include "catalog/indexing.h" #include "catalog/pg_type.h" #include "commands/sequence.h" +#include "distributed/create_citus_local_table.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/metadata_utility.h" @@ -241,6 +242,13 @@ CreateColocationGroupForRelation(Oid sourceRelationId) static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { + if (IsCitusTableType(sourceRelationId, CITUS_LOCAL_TABLE) || + IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errmsg("citus local tables cannot be colocated with " + "other tables"))); + } + CheckReplicationModel(sourceRelationId, targetRelationId); CheckDistributionColumnType(sourceRelationId, targetRelationId); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index be4e3de94..f75b0015e 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -351,6 +351,16 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) errdetail("Relation \"%s\" is already a reference table", relationName))); } + else if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE)) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot upgrade to reference table"), + errdetail("Relation \"%s\" is a citus local table and " + "currently it is not supported to upgrade " + "a citus local table to a reference table ", + relationName))); + } if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) { diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 4ea33f8b8..e0c0d0747 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -97,6 +97,25 @@ citus_table_is_visible(PG_FUNCTION_ARGS) } +/* + * ErrorIfRelationIsAKnownShard errors out if the relation with relationId is + * a shard relation. + */ +void +ErrorIfRelationIsAKnownShard(Oid relationId) +{ + /* search the relation in all schemas */ + bool onlySearchPath = false; + if (!RelationIsAKnownShard(relationId, onlySearchPath)) + { + return; + } + + const char *relationName = get_rel_name(relationId); + ereport(ERROR, (errmsg("relation \"%s\" is a shard relation ", relationName))); +} + + /* * RelationIsAKnownShard gets a relationId, check whether it's a shard of * any distributed table. If onlySearchPath is true, then it searches diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index a75be5c1d..bda12b26c 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -48,6 +48,22 @@ typedef struct DistributeObjectOps const DistributeObjectOps * GetDistributeObjectOps(Node *node); +/* + * Flags that can be passed to GetForeignKeyOids to indicate + * which foreign key constraint OIDs are to be extracted + */ +typedef enum ExtractForeignKeyConstraintsMode +{ + /* extract the foreign key OIDs where the table is the referencing one */ + INCLUDE_REFERENCING_CONSTRAINTS = 1 << 0, + + /* extract the foreign key OIDs the table is the referenced one */ + INCLUDE_REFERENCED_CONSTRAINTS = 1 << 1, + + /* exclude the self-referencing foreign keys */ + EXCLUDE_SELF_REFERENCES = 1 << 2 +} ExtractForeignKeyConstraintMode; + /* cluster.c - forward declarations */ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand); @@ -108,6 +124,8 @@ extern bool HasForeignKeyToReferenceTable(Oid relationOid); extern bool TableReferenced(Oid relationOid); extern bool TableReferencing(Oid relationOid); extern bool ConstraintIsAForeignKey(char *inputConstaintName, Oid relationOid); +extern void ErrorIfTableHasExternalForeignKeys(Oid relationId); +extern List * GetForeignKeyOids(Oid relationId, int flags); /* function.c - forward declarations */ @@ -278,6 +296,8 @@ extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumComm /* trigger.c - forward declarations */ extern List * GetExplicitTriggerCommandList(Oid relationId); +extern List * GetExplicitTriggerNameList(Oid relationId); +extern char * GetTriggerNameById(Oid triggerId); extern List * GetExplicitTriggerIdList(Oid relationId); extern Oid get_relation_trigger_oid_compat(HeapTuple heapTuple); extern void ErrorIfUnsupportedCreateTriggerCommand(CreateTrigStmt *createTriggerStmt); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index c5cf04f21..9dd9fb705 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -153,6 +153,7 @@ extern void EnsureHashDistributedTable(Oid relationId); extern void EnsureSequenceOwner(Oid sequenceOid); extern void EnsureFunctionOwner(Oid functionId); extern void EnsureSuperUser(void); +extern void EnsureTableNotDistributed(Oid relationId); extern void EnsureReplicationSettings(Oid relationId, char replicationModel); extern bool RegularTable(Oid relationId); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 69034c110..2942d9aa7 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -34,7 +34,8 @@ #define pg_plan_query_compat(p, q, c, b) pg_plan_query(p, q, c, b) #define planner_compat(p, c, b) planner(p, NULL, c, b) #define standard_planner_compat(a, c, d) standard_planner(a, NULL, c, d) -#define getOwnedSequencesCompat(a, b) getOwnedSequences(a) +#define GetSequencesOwnedByRelation(a) getOwnedSequences(a) +#define GetSequencesOwnedByColumn(a, b) getOwnedSequences_internal(a, b, 0) #define CMDTAG_SELECT_COMPAT CMDTAG_SELECT #define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) \ ExplainOnePlan(a, b, c, d, e, f, g, h) @@ -48,7 +49,8 @@ #define planner_compat(p, c, b) planner(p, c, b) #define standard_planner_compat(a, c, d) standard_planner(a, c, d) #define CMDTAG_SELECT_COMPAT "SELECT" -#define getOwnedSequencesCompat(a, b) getOwnedSequences(a, b) +#define GetSequencesOwnedByRelation(a) getOwnedSequences(a, InvalidAttrNumber) +#define GetSequencesOwnedByColumn(a, b) getOwnedSequences(a, b) #define ExplainOnePlanCompat(a, b, c, d, e, f, g, h) ExplainOnePlan(a, b, c, d, e, f, g) #define SetListCellPtr(a, b) ((a)->data.ptr_value = (b)) #define RangeTableEntryFromNSItem(a) (a) diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 0fcf75334..753ab9f83 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -73,7 +73,10 @@ extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); +extern bool CoordinatorAddedAsWorkerNode(void); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); +extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); +extern void ErrorIfCoordinatorNotAddedAsWorkerNode(void); extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); extern uint32 ActiveReadableNonCoordinatorNodeCount(void); diff --git a/src/include/distributed/worker_shard_visibility.h b/src/include/distributed/worker_shard_visibility.h index 871421eb7..46f807fd8 100644 --- a/src/include/distributed/worker_shard_visibility.h +++ b/src/include/distributed/worker_shard_visibility.h @@ -17,6 +17,7 @@ extern bool OverrideTableVisibility; extern void ReplaceTableVisibleFunction(Node *inputNode); +extern void ErrorIfRelationIsAKnownShard(Oid relationId); extern bool RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 59955a5a6..93ee53e58 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -34,6 +34,9 @@ s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g # shard table names for custom_aggregate_support s/ daily_uniques_[0-9]+ / daily_uniques_xxxxxxx /g +# shard table names for isolation_create_citus_local_table +s/ "citus_local_table_([0-9]+)_[0-9]+" / "citus_local_table_\1_xxxxxxx" /g + # In foreign_key_restriction_enforcement, normalize shard names s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g diff --git a/src/test/regress/expected/citus_local_tables.out b/src/test/regress/expected/citus_local_tables.out new file mode 100644 index 000000000..89e40d1ec --- /dev/null +++ b/src/test/regress/expected/citus_local_tables.out @@ -0,0 +1,441 @@ +\set VERBOSITY terse +SET citus.next_shard_id TO 1504000; +SET citus.shard_replication_factor TO 1; +SET citus.enable_local_execution TO ON; +SET citus.log_local_commands TO ON; +CREATE SCHEMA citus_local_tables_test_schema; +SET search_path TO citus_local_tables_test_schema; +--------------------------------------------------------------------- +------- citus local table creation ------- +--------------------------------------------------------------------- +-- ensure that coordinator is added to pg_dist_node +SET client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +CREATE TABLE citus_local_table_1 (a int); +-- this should work as coordinator is added to pg_dist_node +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- try to remove coordinator and observe failure as there exist a citus local table +SELECT 1 FROM master_remove_node('localhost', :master_port); +ERROR: you cannot remove the primary node of a node group which has shard placements +DROP TABLE citus_local_table_1; +NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE +-- this should work now as the citus local table is dropped +SELECT 1 FROM master_remove_node('localhost', :master_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE citus_local_table_1 (a int primary key); +-- this should fail as coordinator is removed from pg_dist_node +SELECT create_citus_local_table('citus_local_table_1'); +ERROR: could not find the coordinator node in metadata as it is not added as a worker +-- let coordinator have citus local tables again for next tests +set client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +-- creating citus local table having no data initially would work +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- creating citus local table having data in it would also work +CREATE TABLE citus_local_table_2(a int primary key); +INSERT INTO citus_local_table_2 VALUES(1); +SELECT create_citus_local_table('citus_local_table_2'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- also create indexes on them +CREATE INDEX citus_local_table_1_idx ON citus_local_table_1(a); +NOTICE: executing the command locally: CREATE INDEX citus_local_table_1_idx_1504001 ON citus_local_tables_test_schema.citus_local_table_1_1504001 USING btree (a ) +CREATE INDEX citus_local_table_2_idx ON citus_local_table_2(a); +NOTICE: executing the command locally: CREATE INDEX citus_local_table_2_idx_1504002 ON citus_local_tables_test_schema.citus_local_table_2_1504002 USING btree (a ) +-- drop them for next tests +DROP TABLE citus_local_table_1, citus_local_table_2; +NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_2_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE +-- create indexes before creating the citus local tables +-- .. for an initially empty table +CREATE TABLE citus_local_table_1(a int); +CREATE INDEX citus_local_table_1_idx ON citus_local_table_1(a); +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- .. and for another table having data in it before creating citus local table +CREATE TABLE citus_local_table_2(a int); +INSERT INTO citus_local_table_2 VALUES(1); +CREATE INDEX citus_local_table_2_idx ON citus_local_table_2(a); +SELECT create_citus_local_table('citus_local_table_2'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table (a int); +SELECT create_distributed_table('distributed_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- cannot create citus local table from an existing citus table +SELECT create_citus_local_table('distributed_table'); +ERROR: table "distributed_table" is already distributed +-- partitioned table tests -- +CREATE TABLE partitioned_table(a int, b int) PARTITION BY RANGE (a); +CREATE TABLE partitioned_table_1 PARTITION OF partitioned_table FOR VALUES FROM (0) TO (10); +CREATE TABLE partitioned_table_2 PARTITION OF partitioned_table FOR VALUES FROM (10) TO (20); +-- cannot create partitioned citus local tables +SELECT create_citus_local_table('partitioned_table'); +ERROR: cannot create citus local table "partitioned_table", only regular tables and foreign tables are supported for citus local table creation +BEGIN; + CREATE TABLE citus_local_table PARTITION OF partitioned_table FOR VALUES FROM (20) TO (30); + -- cannot create citus local table as a partition of a local table + SELECT create_citus_local_table('citus_local_table'); +ERROR: cannot create citus local table "citus_local_table", citus local tables cannot be partition of other tables +ROLLBACK; +BEGIN; + CREATE TABLE citus_local_table (a int, b int); + SELECT create_citus_local_table('citus_local_table'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + + -- cannot create citus local table as a partition of a local table + -- via ALTER TABLE commands as well + ALTER TABLE partitioned_table ATTACH PARTITION citus_local_table FOR VALUES FROM (20) TO (30); +ERROR: non-distributed tables cannot have distributed partitions +ROLLBACK; +BEGIN; + SELECT create_distributed_table('partitioned_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + CREATE TABLE citus_local_table (a int, b int); + SELECT create_citus_local_table('citus_local_table'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + + -- cannot attach citus local table to a partitioned distributed table + ALTER TABLE partitioned_table ATTACH PARTITION citus_local_table FOR VALUES FROM (20) TO (30); +ERROR: distributed tables cannot have non-colocated distributed tables as a partition +ROLLBACK; +-- show that we do not support inheritance relationships -- +CREATE TABLE parent_table (a int, b text); +CREATE TABLE child_table () INHERITS (parent_table); +-- both of below should error out +SELECT create_citus_local_table('parent_table'); +ERROR: cannot create citus local table "parent_table", citus local tables cannot be involved in inheritance relationships +SELECT create_citus_local_table('child_table'); +ERROR: cannot create citus local table "child_table", citus local tables cannot be involved in inheritance relationships +-- show that we support UNLOGGED tables -- +CREATE UNLOGGED TABLE unlogged_table (a int primary key); +SELECT create_citus_local_table('unlogged_table'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- show that we allow triggers -- +BEGIN; + CREATE TABLE citus_local_table_3 (value int); + -- create a simple function to be invoked by trigger + CREATE FUNCTION update_value() RETURNS trigger AS $update_value$ + BEGIN + UPDATE citus_local_table_3 SET value=value+1; + RETURN NEW; + END; + $update_value$ LANGUAGE plpgsql; + CREATE TRIGGER insert_trigger + AFTER INSERT ON citus_local_table_3 + FOR EACH STATEMENT EXECUTE PROCEDURE update_value(); + SELECT create_citus_local_table('citus_local_table_3'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO citus_local_table_3 VALUES (1); +NOTICE: executing the command locally: INSERT INTO citus_local_tables_test_schema.citus_local_table_3_1504024 (value) VALUES (1) +NOTICE: executing the command locally: UPDATE citus_local_tables_test_schema.citus_local_table_3_1504024 citus_local_table_3 SET value = (value OPERATOR(pg_catalog.+) 1) + -- show that trigger is executed only once, we should see "2" (not "3") + SELECT * FROM citus_local_table_3; +NOTICE: executing the command locally: SELECT value FROM citus_local_tables_test_schema.citus_local_table_3_1504024 citus_local_table_3 + value +--------------------------------------------------------------------- + 2 +(1 row) + +ROLLBACK; +-- show that we do not support policies in citus community -- +BEGIN; + CREATE TABLE citus_local_table_3 (table_user text); + ALTER TABLE citus_local_table_3 ENABLE ROW LEVEL SECURITY; + CREATE ROLE table_users; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes + CREATE POLICY table_policy ON citus_local_table_3 TO table_users + USING (table_user = current_user); + -- this should error out + SELECT create_citus_local_table('citus_local_table_3'); +ERROR: policies on distributed tables are only supported in Citus Enterprise +ROLLBACK; +-- show that we properly handle sequences on citus local tables -- +BEGIN; + CREATE SEQUENCE col3_seq; + CREATE TABLE citus_local_table_3 (col1 serial, col2 int, col3 int DEFAULT nextval('col3_seq')); + SELECT create_citus_local_table('citus_local_table_3'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + + -- print column default expressions + -- we should only see shell relation below + SELECT table_name, column_name, column_default + FROM information_schema.COLUMNS + WHERE table_name like 'citus_local_table_3%' and column_default != '' ORDER BY 1,2; + table_name | column_name | column_default +--------------------------------------------------------------------- + citus_local_table_3 | col1 | nextval('citus_local_table_3_col1_seq'::regclass) + citus_local_table_3 | col3 | nextval('col3_seq'::regclass) +(2 rows) + + -- print sequence ownerships + -- show that the only internal sequence is on col1 and it is owned by shell relation + SELECT s.relname as sequence_name, t.relname, a.attname + FROM pg_class s + JOIN pg_depend d on d.objid=s.oid and d.classid='pg_class'::regclass and d.refclassid='pg_class'::regclass + JOIN pg_class t on t.oid=d.refobjid + JOIN pg_attribute a on a.attrelid=t.oid and a.attnum=d.refobjsubid + WHERE s.relkind='S' and s.relname like 'citus_local_table_3%' ORDER BY 1,2; + sequence_name | relname | attname +--------------------------------------------------------------------- + citus_local_table_3_col1_seq | citus_local_table_3 | col1 +(1 row) + +ROLLBACK; +-- test foreign tables using fake FDW -- +CREATE FOREIGN TABLE foreign_table ( + id bigint not null, + full_name text not null default '' +) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); +-- observe that we do not create fdw server for shell table, both shard relation +-- & shell relation points to the same same server object +SELECT create_citus_local_table('foreign_table'); +NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined +NOTICE: server "fake_fdw_server" already exists, skipping +NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- drop them for next tests +DROP TABLE citus_local_table_1, citus_local_table_2, distributed_table; +NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_2_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE +-- create test tables +CREATE TABLE citus_local_table_1 (a int primary key); +SELECT create_citus_local_table('citus_local_table_1'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE citus_local_table_2 (a int primary key); +SELECT create_citus_local_table('citus_local_table_2'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE local_table (a int primary key); +CREATE TABLE distributed_table (a int primary key); +SELECT create_distributed_table('distributed_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE reference_table (a int primary key); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- show that colociation of citus local tables are not supported for now +-- between citus local tables +SELECT mark_tables_colocated('citus_local_table_1', ARRAY['citus_local_table_2']); +ERROR: citus local tables cannot be colocated with other tables +-- between citus local tables and reference tables +SELECT mark_tables_colocated('citus_local_table_1', ARRAY['reference_table']); +ERROR: citus local tables cannot be colocated with other tables +SELECT mark_tables_colocated('reference_table', ARRAY['citus_local_table_1']); +ERROR: citus local tables cannot be colocated with other tables +-- between citus local tables and distributed tables +SELECT mark_tables_colocated('citus_local_table_1', ARRAY['distributed_table']); +ERROR: citus local tables cannot be colocated with other tables +SELECT mark_tables_colocated('distributed_table', ARRAY['citus_local_table_1']); +ERROR: citus local tables cannot be colocated with other tables +-- upgrade_to_reference_table is not supported +SELECT upgrade_to_reference_table('citus_local_table_1'); +ERROR: cannot upgrade to reference table +-- master_create_empty_shard is not supported +SELECT master_create_empty_shard('citus_local_table_1'); +ERROR: relation "citus_local_table_1" is a citus local table +-- get_shard_id_for_distribution_column is supported +SELECT get_shard_id_for_distribution_column('citus_local_table_1', 'not_checking_this_arg_for_non_dist_tables'); + get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1504027 +(1 row) + +SELECT get_shard_id_for_distribution_column('citus_local_table_1'); + get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1504027 +(1 row) + +-- master_copy_shard_placement is not supported +SELECT master_copy_shard_placement(shardid, 'localhost', :master_port, 'localhost', :worker_1_port, true) +FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='citus_local_table_1'::regclass) as shardid; +ERROR: Table 'citus_local_table_1' is a citus local table. Replicating shard of a citus local table currently is not supported +-- undistribute_table is supported +BEGIN; + SELECT undistribute_table('citus_local_table_1'); +NOTICE: Creating a new local table for citus_local_tables_test_schema.citus_local_table_1 +NOTICE: Moving the data of citus_local_tables_test_schema.citus_local_table_1 +NOTICE: executing the command locally: SELECT a FROM citus_local_tables_test_schema.citus_local_table_1_1504027 citus_local_table_1 +NOTICE: Dropping the old citus_local_tables_test_schema.citus_local_table_1 +NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE +NOTICE: Renaming the new table to citus_local_tables_test_schema.citus_local_table_1 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +-- tests with citus local tables initially having foreign key relationships +CREATE TABLE local_table_1 (a int primary key); +CREATE TABLE local_table_2 (a int primary key references local_table_1(a)); +CREATE TABLE local_table_3 (a int primary key, b int references local_table_3(a)); +-- below two should fail as we do not allow foreign keys between +-- postgres local tables and citus local tables +SELECT create_citus_local_table('local_table_1'); +ERROR: relation "local_table_1" is involved in a foreign key relationship with another table +SELECT create_citus_local_table('local_table_2'); +ERROR: relation "local_table_2" is involved in a foreign key relationship with another table +-- below should work as we allow initial self references in citus local tables +SELECT create_citus_local_table('local_table_3'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +--------------------------------------------------------------------- +----- tests for object names that should be escaped properly ----- +--------------------------------------------------------------------- +CREATE SCHEMA "CiTUS!LocalTables"; +-- create table with weird names +CREATE TABLE "CiTUS!LocalTables"."LocalTabLE.1!?!"(id int, "TeNANt_Id" int); +-- should work +SELECT create_citus_local_table('"CiTUS!LocalTables"."LocalTabLE.1!?!"'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- drop the table before creating it when the search path is set +SET search_path to "CiTUS!LocalTables" ; +DROP TABLE "LocalTabLE.1!?!"; +NOTICE: executing the command locally: DROP TABLE IF EXISTS "CiTUS!LocalTables"."LocalTabLE.1!?!_1504035" CASCADE +-- have a custom type in the local table +CREATE TYPE local_type AS (key int, value jsonb); +-- create btree_gist for GiST index +CREATE EXTENSION btree_gist; +CREATE TABLE "LocalTabLE.1!?!"( + id int PRIMARY KEY, + "TeNANt_Id" int, + "local_Type" local_type, + "jsondata" jsonb NOT NULL, + name text, + price numeric CHECK (price > 0), + serial_data bigserial, UNIQUE (id, price), + EXCLUDE USING GIST (name WITH =)); +-- create some objects before create_citus_local_table +CREATE INDEX "my!Index1" ON "LocalTabLE.1!?!"(id) WITH ( fillfactor = 80 ) WHERE id > 10; +CREATE UNIQUE INDEX uniqueIndex ON "LocalTabLE.1!?!" (id); +-- ingest some data before create_citus_local_table +INSERT INTO "LocalTabLE.1!?!" VALUES (1, 1, (1, row_to_json(row(1,1)))::local_type, row_to_json(row(1,1), true)), + (2, 1, (2, row_to_json(row(2,2)))::local_type, row_to_json(row(2,2), 'false')); +-- create a replica identity before create_citus_local_table +ALTER TABLE "LocalTabLE.1!?!" REPLICA IDENTITY USING INDEX uniqueIndex; +-- this shouldn't give any syntax errors +SELECT create_citus_local_table('"LocalTabLE.1!?!"'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- create some objects after create_citus_local_table +CREATE INDEX "my!Index2" ON "LocalTabLE.1!?!"(id) WITH ( fillfactor = 90 ) WHERE id < 20; +NOTICE: executing the command locally: CREATE INDEX "my!Index2_1504036" ON "CiTUS!LocalTables"."LocalTabLE.1!?!_1504036" USING btree (id ) WITH (fillfactor = '90' )WHERE (id < 20) +CREATE UNIQUE INDEX uniqueIndex2 ON "LocalTabLE.1!?!"(id); +NOTICE: executing the command locally: CREATE UNIQUE INDEX uniqueindex2_1504036 ON "CiTUS!LocalTables"."LocalTabLE.1!?!_1504036" USING btree (id ) +--------------------------------------------------------------------- +---- utility command execution ---- +--------------------------------------------------------------------- +SET search_path TO citus_local_tables_test_schema; +-- any foreign key between citus local tables and other tables cannot be set for now +-- most should error out (for now with meaningless error messages) +-- between citus local tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(a) references citus_local_table_2(a); +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1504027, 'citus_local_tables_test_schema', 1504028, 'citus_local_tables_test_schema', 'ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(a) references citus_local_table_2(a);') +-- between citus local tables and reference tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_ref FOREIGN KEY(a) references reference_table(a); +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1504027, 'citus_local_tables_test_schema', 1504033, 'citus_local_tables_test_schema', 'ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_ref FOREIGN KEY(a) references reference_table(a);') +ALTER TABLE reference_table ADD CONSTRAINT fkey_ref_to_c FOREIGN KEY(a) references citus_local_table_1(a); +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1504033, 'citus_local_tables_test_schema', 1504027, 'citus_local_tables_test_schema', 'ALTER TABLE reference_table ADD CONSTRAINT fkey_ref_to_c FOREIGN KEY(a) references citus_local_table_1(a);') +ERROR: relation "citus_local_tables_test_schema.citus_local_table_1_1504027" does not exist +-- between citus local tables and distributed tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_dist FOREIGN KEY(a) references distributed_table(a); +ERROR: cannot create foreign key constraint since foreign keys from reference tables to distributed tables are not supported +ALTER TABLE distributed_table ADD CONSTRAINT fkey_dist_to_c FOREIGN KEY(a) references citus_local_table_1(a); +ERROR: relation "citus_local_tables_test_schema.citus_local_table_1_1504027" does not exist +-- between citus local tables and local tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_local FOREIGN KEY(a) references local_table(a); +ERROR: referenced table "local_table" must be a distributed table or a reference table +ALTER TABLE local_table ADD CONSTRAINT fkey_local_to_c FOREIGN KEY(a) references citus_local_table_1(a); +-- cleanup at exit +DROP SCHEMA citus_local_tables_test_schema, "CiTUS!LocalTables" CASCADE; +NOTICE: drop cascades to 23 other objects diff --git a/src/test/regress/expected/isolation_create_citus_local_table.out b/src/test/regress/expected/isolation_create_citus_local_table.out new file mode 100644 index 000000000..1643ee521 --- /dev/null +++ b/src/test/regress/expected/isolation_create_citus_local_table.out @@ -0,0 +1,215 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-1 s2-create-citus-local-table-1 s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-1: <... completed> +error in steps s1-commit s2-create-citus-local-table-1: ERROR: relation "citus_local_table_1_xxxxxxx" is a shard relation +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-3 s2-create-citus-local-table-3 s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-3: SELECT create_citus_local_table('another_schema.citus_local_table_3'); +create_citus_local_table + + +step s2-create-citus-local-table-3: SELECT create_citus_local_table('another_schema.citus_local_table_3'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-3: <... completed> +error in steps s1-commit s2-create-citus-local-table-3: ERROR: relation "citus_local_table_3_xxxxxxx" is a shard relation +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-1 s2-create-citus-local-table-1 s1-rollback s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-rollback: ROLLBACK; +step s2-create-citus-local-table-1: <... completed> +create_citus_local_table + + +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-1 s2-select s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-select: SELECT * FROM citus_local_table_1; +step s1-commit: COMMIT; +step s2-select: <... completed> +a + +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-1 s2-update s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-update: UPDATE citus_local_table_1 SET a=1 WHERE a=2; +step s1-commit: COMMIT; +step s2-update: <... completed> +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-1 s2-truncate s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-truncate: TRUNCATE citus_local_table_1; +step s1-commit: COMMIT; +step s2-truncate: <... completed> +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s2-create-citus-local-table-2 s1-begin s2-begin s1-create-citus-local-table-1 s2-fkey-to-another s1-commit s2-commit +step s2-create-citus-local-table-2: SELECT create_citus_local_table('citus_local_table_2'); +create_citus_local_table + + +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-fkey-to-another: ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(a) REFERENCES citus_local_table_2(a); +step s1-commit: COMMIT; +step s2-fkey-to-another: <... completed> +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-create-citus-local-table-1 s2-remove-coordinator s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +create_citus_local_table + + +step s2-remove-coordinator: SELECT master_remove_node('localhost', 57636); +step s1-commit: COMMIT; +step s2-remove-coordinator: <... completed> +error in steps s1-commit s2-remove-coordinator: ERROR: you cannot remove the primary node of a node group which has shard placements +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-drop-table s2-create-citus-local-table-1 s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-drop-table: DROP TABLE citus_local_table_1; +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-1: <... completed> +error in steps s1-commit s2-create-citus-local-table-1: ERROR: cannot create citus local table, relation does not exist +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-delete s2-create-citus-local-table-1 s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-delete: DELETE FROM citus_local_table_1 WHERE a=2; +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-1: <... completed> +create_citus_local_table + + +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-select s2-create-citus-local-table-1 s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-select: SELECT * FROM citus_local_table_1; +a + +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-1: <... completed> +create_citus_local_table + + +step s2-commit: COMMIT; +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-remove-coordinator s2-create-citus-local-table-1 s1-commit s2-commit +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-remove-coordinator: SELECT master_remove_node('localhost', 57636); +master_remove_node + + +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-1: <... completed> +error in steps s1-commit s2-create-citus-local-table-1: ERROR: could not find the coordinator node in metadata as it is not added as a worker +step s2-commit: COMMIT; +master_remove_node + + +starting permutation: s1-remove-coordinator s1-begin s2-begin s1-add-coordinator s2-create-citus-local-table-1 s1-commit s2-commit +step s1-remove-coordinator: SELECT master_remove_node('localhost', 57636); +master_remove_node + + +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-add-coordinator: SELECT 1 FROM master_add_node('localhost', 57636, 0); +?column? + +1 +step s2-create-citus-local-table-1: SELECT create_citus_local_table('citus_local_table_1'); +step s1-commit: COMMIT; +step s2-create-citus-local-table-1: <... completed> +create_citus_local_table + + +step s2-commit: COMMIT; +master_remove_node + + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 3ca8d9fba..85776f3a6 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -426,8 +426,9 @@ SELECT * FROM print_extension_changes(); function task_tracker_task_status(bigint,integer) | function worker_execute_sql_task(bigint,integer,text,boolean) | function worker_merge_files_and_run_query(bigint,integer,text,text) | + | function create_citus_local_table(regclass) | function undistribute_table(regclass) -(7 rows) +(8 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index 9db4ce9e0..477675c8f 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -58,6 +58,11 @@ CREATE TYPE citus_mx_test_schema.new_composite_type as (key1 text, key2 text); CREATE TYPE order_side_mx AS ENUM ('buy', 'sell'); -- now create required stuff in the worker 1 \c - - - :worker_1_port +-- show that we do not support creating citus local tables from mx workers for now +CREATE TABLE citus_local_table(a int); +SELECT create_citus_local_table('citus_local_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. -- create schema to test schema support CREATE SCHEMA citus_mx_test_schema_join_1; CREATE SCHEMA citus_mx_test_schema_join_2; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 129ab3109..e734870ee 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -33,6 +33,7 @@ test: isolation_dump_local_wait_edges test: isolation_replace_wait_function test: isolation_distributed_deadlock_detection test: isolation_replicate_reference_tables_to_coordinator +test: isolation_create_citus_local_table # creating a restore point briefly blocks all # writes, run this test serially. diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5888b2ad7..c0452abd2 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -304,6 +304,7 @@ test: foreign_key_to_reference_table test: replicate_reference_tables_to_coordinator test: coordinator_shouldhaveshards test: local_shard_utility_command_execution +test: citus_local_tables test: remove_coordinator diff --git a/src/test/regress/spec/isolation_create_citus_local_table.spec b/src/test/regress/spec/isolation_create_citus_local_table.spec new file mode 100644 index 000000000..fc352afdc --- /dev/null +++ b/src/test/regress/spec/isolation_create_citus_local_table.spec @@ -0,0 +1,74 @@ +setup +{ + SELECT 1 FROM master_add_node('localhost', 57636, 0); + CREATE TABLE citus_local_table_1(a int); + CREATE TABLE citus_local_table_2(a int unique); + + CREATE SCHEMA another_schema; + CREATE TABLE another_schema.citus_local_table_3(a int unique); +} + +teardown +{ + DROP TABLE IF EXISTS citus_local_table_1, citus_local_table_2 CASCADE; + DROP SCHEMA IF EXISTS another_schema CASCADE; + -- remove coordinator only if it is added to pg_dist_node + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE nodeport=57636; +} + +session "s1" + +step "s1-begin" { BEGIN; } +step "s1-create-citus-local-table-1" { SELECT create_citus_local_table('citus_local_table_1'); } +step "s1-create-citus-local-table-3" { SELECT create_citus_local_table('another_schema.citus_local_table_3'); } +step "s1-drop-table" { DROP TABLE citus_local_table_1; } +step "s1-delete" { DELETE FROM citus_local_table_1 WHERE a=2; } +step "s1-select" { SELECT * FROM citus_local_table_1; } +step "s1-remove-coordinator" { SELECT master_remove_node('localhost', 57636); } +step "s1-add-coordinator" { SELECT 1 FROM master_add_node('localhost', 57636, 0); } +step "s1-commit" { COMMIT; } +step "s1-rollback" { ROLLBACK; } + +session "s2" + +step "s2-begin" { BEGIN; } +step "s2-create-citus-local-table-1" { SELECT create_citus_local_table('citus_local_table_1'); } +step "s2-create-citus-local-table-2" { SELECT create_citus_local_table('citus_local_table_2'); } +step "s2-create-citus-local-table-3" { SELECT create_citus_local_table('another_schema.citus_local_table_3'); } +step "s2-select" { SELECT * FROM citus_local_table_1; } +step "s2-update" { UPDATE citus_local_table_1 SET a=1 WHERE a=2; } +step "s2-truncate" { TRUNCATE citus_local_table_1; } +step "s2-fkey-to-another" { ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(a) REFERENCES citus_local_table_2(a); } +step "s2-remove-coordinator" { SELECT master_remove_node('localhost', 57636); } +step "s2-commit" { COMMIT; } + + +// create_citus_local_table vs command/query // + +// Second session should error out as the table becomes a citus local table after the first session commits .. +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-create-citus-local-table-1" "s1-commit" "s2-commit" +// .. and it should error out even if we are in a different schema than the table +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-3" "s2-create-citus-local-table-3" "s1-commit" "s2-commit" +// Second session should be able to create citus local table as the first one rollbacks +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-create-citus-local-table-1" "s1-rollback" "s2-commit" +// Any modifying queries, DML commands and SELECT will block +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-select" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-update" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-truncate" "s1-commit" "s2-commit" +// Foreign key creation should succeed as it will be blocked until first session creates citus local table +permutation "s2-create-citus-local-table-2" "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-fkey-to-another" "s1-commit" "s2-commit" +// master_remove_node should first block and then fail +permutation "s1-begin" "s2-begin" "s1-create-citus-local-table-1" "s2-remove-coordinator" "s1-commit" "s2-commit" + + +// command/query vs create_citus_local_table // + +// create_citus_local_table_1 should first block and then error out as the first session drops the table +permutation "s1-begin" "s2-begin" "s1-drop-table" "s2-create-citus-local-table-1" "s1-commit" "s2-commit" +// Any modifying queries, DML commands and SELECT will block +permutation "s1-begin" "s2-begin" "s1-delete" "s2-create-citus-local-table-1" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-select" "s2-create-citus-local-table-1" "s1-commit" "s2-commit" +// create_citus_local_table should block on master_remove_node and then fail +permutation "s1-begin" "s2-begin" "s1-remove-coordinator" "s2-create-citus-local-table-1" "s1-commit" "s2-commit" +// create_citus_local_table should block on master_add_node and then succeed +permutation "s1-remove-coordinator" "s1-begin" "s2-begin" "s1-add-coordinator" "s2-create-citus-local-table-1" "s1-commit" "s2-commit" diff --git a/src/test/regress/sql/citus_local_tables.sql b/src/test/regress/sql/citus_local_tables.sql new file mode 100644 index 000000000..cf9cff095 --- /dev/null +++ b/src/test/regress/sql/citus_local_tables.sql @@ -0,0 +1,337 @@ +\set VERBOSITY terse + +SET citus.next_shard_id TO 1504000; +SET citus.shard_replication_factor TO 1; +SET citus.enable_local_execution TO ON; +SET citus.log_local_commands TO ON; + +CREATE SCHEMA citus_local_tables_test_schema; +SET search_path TO citus_local_tables_test_schema; + +------------------------------------------ +------- citus local table creation ------- +------------------------------------------ + +-- ensure that coordinator is added to pg_dist_node +SET client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +RESET client_min_messages; + +CREATE TABLE citus_local_table_1 (a int); + +-- this should work as coordinator is added to pg_dist_node +SELECT create_citus_local_table('citus_local_table_1'); + +-- try to remove coordinator and observe failure as there exist a citus local table +SELECT 1 FROM master_remove_node('localhost', :master_port); + +DROP TABLE citus_local_table_1; + +-- this should work now as the citus local table is dropped +SELECT 1 FROM master_remove_node('localhost', :master_port); + +CREATE TABLE citus_local_table_1 (a int primary key); + +-- this should fail as coordinator is removed from pg_dist_node +SELECT create_citus_local_table('citus_local_table_1'); + +-- let coordinator have citus local tables again for next tests +set client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +RESET client_min_messages; + +-- creating citus local table having no data initially would work +SELECT create_citus_local_table('citus_local_table_1'); + +-- creating citus local table having data in it would also work +CREATE TABLE citus_local_table_2(a int primary key); +INSERT INTO citus_local_table_2 VALUES(1); + +SELECT create_citus_local_table('citus_local_table_2'); + +-- also create indexes on them +CREATE INDEX citus_local_table_1_idx ON citus_local_table_1(a); +CREATE INDEX citus_local_table_2_idx ON citus_local_table_2(a); + +-- drop them for next tests +DROP TABLE citus_local_table_1, citus_local_table_2; + +-- create indexes before creating the citus local tables + +-- .. for an initially empty table +CREATE TABLE citus_local_table_1(a int); +CREATE INDEX citus_local_table_1_idx ON citus_local_table_1(a); +SELECT create_citus_local_table('citus_local_table_1'); + +-- .. and for another table having data in it before creating citus local table +CREATE TABLE citus_local_table_2(a int); +INSERT INTO citus_local_table_2 VALUES(1); +CREATE INDEX citus_local_table_2_idx ON citus_local_table_2(a); +SELECT create_citus_local_table('citus_local_table_2'); + +CREATE TABLE distributed_table (a int); +SELECT create_distributed_table('distributed_table', 'a'); + +-- cannot create citus local table from an existing citus table +SELECT create_citus_local_table('distributed_table'); + +-- partitioned table tests -- + +CREATE TABLE partitioned_table(a int, b int) PARTITION BY RANGE (a); +CREATE TABLE partitioned_table_1 PARTITION OF partitioned_table FOR VALUES FROM (0) TO (10); +CREATE TABLE partitioned_table_2 PARTITION OF partitioned_table FOR VALUES FROM (10) TO (20); + +-- cannot create partitioned citus local tables +SELECT create_citus_local_table('partitioned_table'); + +BEGIN; + CREATE TABLE citus_local_table PARTITION OF partitioned_table FOR VALUES FROM (20) TO (30); + + -- cannot create citus local table as a partition of a local table + SELECT create_citus_local_table('citus_local_table'); +ROLLBACK; + +BEGIN; + CREATE TABLE citus_local_table (a int, b int); + + SELECT create_citus_local_table('citus_local_table'); + + -- cannot create citus local table as a partition of a local table + -- via ALTER TABLE commands as well + ALTER TABLE partitioned_table ATTACH PARTITION citus_local_table FOR VALUES FROM (20) TO (30); +ROLLBACK; + +BEGIN; + SELECT create_distributed_table('partitioned_table', 'a'); + + CREATE TABLE citus_local_table (a int, b int); + SELECT create_citus_local_table('citus_local_table'); + + -- cannot attach citus local table to a partitioned distributed table + ALTER TABLE partitioned_table ATTACH PARTITION citus_local_table FOR VALUES FROM (20) TO (30); +ROLLBACK; + +-- show that we do not support inheritance relationships -- + +CREATE TABLE parent_table (a int, b text); +CREATE TABLE child_table () INHERITS (parent_table); + +-- both of below should error out +SELECT create_citus_local_table('parent_table'); +SELECT create_citus_local_table('child_table'); + +-- show that we support UNLOGGED tables -- + +CREATE UNLOGGED TABLE unlogged_table (a int primary key); +SELECT create_citus_local_table('unlogged_table'); + +-- show that we allow triggers -- + +BEGIN; + CREATE TABLE citus_local_table_3 (value int); + + -- create a simple function to be invoked by trigger + CREATE FUNCTION update_value() RETURNS trigger AS $update_value$ + BEGIN + UPDATE citus_local_table_3 SET value=value+1; + RETURN NEW; + END; + $update_value$ LANGUAGE plpgsql; + + CREATE TRIGGER insert_trigger + AFTER INSERT ON citus_local_table_3 + FOR EACH STATEMENT EXECUTE PROCEDURE update_value(); + + SELECT create_citus_local_table('citus_local_table_3'); + + INSERT INTO citus_local_table_3 VALUES (1); + + -- show that trigger is executed only once, we should see "2" (not "3") + SELECT * FROM citus_local_table_3; +ROLLBACK; + +-- show that we do not support policies in citus community -- + +BEGIN; + CREATE TABLE citus_local_table_3 (table_user text); + + ALTER TABLE citus_local_table_3 ENABLE ROW LEVEL SECURITY; + + CREATE ROLE table_users; + CREATE POLICY table_policy ON citus_local_table_3 TO table_users + USING (table_user = current_user); + + -- this should error out + SELECT create_citus_local_table('citus_local_table_3'); +ROLLBACK; + +-- show that we properly handle sequences on citus local tables -- + +BEGIN; + CREATE SEQUENCE col3_seq; + CREATE TABLE citus_local_table_3 (col1 serial, col2 int, col3 int DEFAULT nextval('col3_seq')); + + SELECT create_citus_local_table('citus_local_table_3'); + + -- print column default expressions + -- we should only see shell relation below + SELECT table_name, column_name, column_default + FROM information_schema.COLUMNS + WHERE table_name like 'citus_local_table_3%' and column_default != '' ORDER BY 1,2; + + -- print sequence ownerships + -- show that the only internal sequence is on col1 and it is owned by shell relation + SELECT s.relname as sequence_name, t.relname, a.attname + FROM pg_class s + JOIN pg_depend d on d.objid=s.oid and d.classid='pg_class'::regclass and d.refclassid='pg_class'::regclass + JOIN pg_class t on t.oid=d.refobjid + JOIN pg_attribute a on a.attrelid=t.oid and a.attnum=d.refobjsubid + WHERE s.relkind='S' and s.relname like 'citus_local_table_3%' ORDER BY 1,2; +ROLLBACK; + +-- test foreign tables using fake FDW -- + +CREATE FOREIGN TABLE foreign_table ( + id bigint not null, + full_name text not null default '' +) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); + +-- observe that we do not create fdw server for shell table, both shard relation +-- & shell relation points to the same same server object +SELECT create_citus_local_table('foreign_table'); + +-- drop them for next tests +DROP TABLE citus_local_table_1, citus_local_table_2, distributed_table; + +-- create test tables +CREATE TABLE citus_local_table_1 (a int primary key); +SELECT create_citus_local_table('citus_local_table_1'); + +CREATE TABLE citus_local_table_2 (a int primary key); +SELECT create_citus_local_table('citus_local_table_2'); + +CREATE TABLE local_table (a int primary key); + +CREATE TABLE distributed_table (a int primary key); +SELECT create_distributed_table('distributed_table', 'a'); + +CREATE TABLE reference_table (a int primary key); +SELECT create_reference_table('reference_table'); + +-- show that colociation of citus local tables are not supported for now +-- between citus local tables +SELECT mark_tables_colocated('citus_local_table_1', ARRAY['citus_local_table_2']); + +-- between citus local tables and reference tables +SELECT mark_tables_colocated('citus_local_table_1', ARRAY['reference_table']); +SELECT mark_tables_colocated('reference_table', ARRAY['citus_local_table_1']); + +-- between citus local tables and distributed tables +SELECT mark_tables_colocated('citus_local_table_1', ARRAY['distributed_table']); +SELECT mark_tables_colocated('distributed_table', ARRAY['citus_local_table_1']); + +-- upgrade_to_reference_table is not supported +SELECT upgrade_to_reference_table('citus_local_table_1'); +-- master_create_empty_shard is not supported +SELECT master_create_empty_shard('citus_local_table_1'); +-- get_shard_id_for_distribution_column is supported +SELECT get_shard_id_for_distribution_column('citus_local_table_1', 'not_checking_this_arg_for_non_dist_tables'); +SELECT get_shard_id_for_distribution_column('citus_local_table_1'); +-- master_copy_shard_placement is not supported +SELECT master_copy_shard_placement(shardid, 'localhost', :master_port, 'localhost', :worker_1_port, true) +FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='citus_local_table_1'::regclass) as shardid; +-- undistribute_table is supported +BEGIN; + SELECT undistribute_table('citus_local_table_1'); +ROLLBACK; + +-- tests with citus local tables initially having foreign key relationships + +CREATE TABLE local_table_1 (a int primary key); +CREATE TABLE local_table_2 (a int primary key references local_table_1(a)); +CREATE TABLE local_table_3 (a int primary key, b int references local_table_3(a)); + +-- below two should fail as we do not allow foreign keys between +-- postgres local tables and citus local tables +SELECT create_citus_local_table('local_table_1'); +SELECT create_citus_local_table('local_table_2'); + +-- below should work as we allow initial self references in citus local tables +SELECT create_citus_local_table('local_table_3'); + +------------------------------------------------------------------ +----- tests for object names that should be escaped properly ----- +------------------------------------------------------------------ + +CREATE SCHEMA "CiTUS!LocalTables"; + +-- create table with weird names +CREATE TABLE "CiTUS!LocalTables"."LocalTabLE.1!?!"(id int, "TeNANt_Id" int); + +-- should work +SELECT create_citus_local_table('"CiTUS!LocalTables"."LocalTabLE.1!?!"'); + +-- drop the table before creating it when the search path is set +SET search_path to "CiTUS!LocalTables" ; +DROP TABLE "LocalTabLE.1!?!"; + +-- have a custom type in the local table +CREATE TYPE local_type AS (key int, value jsonb); + +-- create btree_gist for GiST index +CREATE EXTENSION btree_gist; + +CREATE TABLE "LocalTabLE.1!?!"( + id int PRIMARY KEY, + "TeNANt_Id" int, + "local_Type" local_type, + "jsondata" jsonb NOT NULL, + name text, + price numeric CHECK (price > 0), + serial_data bigserial, UNIQUE (id, price), + EXCLUDE USING GIST (name WITH =)); + +-- create some objects before create_citus_local_table +CREATE INDEX "my!Index1" ON "LocalTabLE.1!?!"(id) WITH ( fillfactor = 80 ) WHERE id > 10; +CREATE UNIQUE INDEX uniqueIndex ON "LocalTabLE.1!?!" (id); + +-- ingest some data before create_citus_local_table +INSERT INTO "LocalTabLE.1!?!" VALUES (1, 1, (1, row_to_json(row(1,1)))::local_type, row_to_json(row(1,1), true)), + (2, 1, (2, row_to_json(row(2,2)))::local_type, row_to_json(row(2,2), 'false')); + +-- create a replica identity before create_citus_local_table +ALTER TABLE "LocalTabLE.1!?!" REPLICA IDENTITY USING INDEX uniqueIndex; + +-- this shouldn't give any syntax errors +SELECT create_citus_local_table('"LocalTabLE.1!?!"'); + +-- create some objects after create_citus_local_table +CREATE INDEX "my!Index2" ON "LocalTabLE.1!?!"(id) WITH ( fillfactor = 90 ) WHERE id < 20; +CREATE UNIQUE INDEX uniqueIndex2 ON "LocalTabLE.1!?!"(id); + +----------------------------------- +---- utility command execution ---- +----------------------------------- + +SET search_path TO citus_local_tables_test_schema; + +-- any foreign key between citus local tables and other tables cannot be set for now +-- most should error out (for now with meaningless error messages) + +-- between citus local tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(a) references citus_local_table_2(a); + +-- between citus local tables and reference tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_ref FOREIGN KEY(a) references reference_table(a); +ALTER TABLE reference_table ADD CONSTRAINT fkey_ref_to_c FOREIGN KEY(a) references citus_local_table_1(a); + +-- between citus local tables and distributed tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_dist FOREIGN KEY(a) references distributed_table(a); +ALTER TABLE distributed_table ADD CONSTRAINT fkey_dist_to_c FOREIGN KEY(a) references citus_local_table_1(a); + +-- between citus local tables and local tables +ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_c_to_local FOREIGN KEY(a) references local_table(a); +ALTER TABLE local_table ADD CONSTRAINT fkey_local_to_c FOREIGN KEY(a) references citus_local_table_1(a); + +-- cleanup at exit +DROP SCHEMA citus_local_tables_test_schema, "CiTUS!LocalTables" CASCADE; diff --git a/src/test/regress/sql/multi_mx_create_table.sql b/src/test/regress/sql/multi_mx_create_table.sql index 67ceab032..1b633c08a 100644 --- a/src/test/regress/sql/multi_mx_create_table.sql +++ b/src/test/regress/sql/multi_mx_create_table.sql @@ -59,6 +59,10 @@ CREATE TYPE order_side_mx AS ENUM ('buy', 'sell'); -- now create required stuff in the worker 1 \c - - - :worker_1_port +-- show that we do not support creating citus local tables from mx workers for now +CREATE TABLE citus_local_table(a int); +SELECT create_citus_local_table('citus_local_table'); + -- create schema to test schema support CREATE SCHEMA citus_mx_test_schema_join_1; CREATE SCHEMA citus_mx_test_schema_join_2;