diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 60c50d3e8..2b723556c 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -111,6 +111,8 @@ $(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql cat $^ > $@ $(EXTENSION)--6.1-8.sql: $(EXTENSION)--6.1-7.sql $(EXTENSION)--6.1-7--6.1-8.sql cat $^ > $@ +$(EXTENSION)--6.1-9.sql: $(EXTENSION)--6.1-8.sql $(EXTENSION)--6.1-8--6.1-9.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-8--6.1-9.sql b/src/backend/distributed/citus--6.1-8--6.1-9.sql new file mode 100644 index 000000000..a1bfd7634 --- /dev/null +++ b/src/backend/distributed/citus--6.1-8--6.1-9.sql @@ -0,0 +1,89 @@ +/* citus--6.1-8--6.1-9.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION master_drop_distributed_table_metadata(logicalrelid regclass, + schema_name text, + table_name text) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_drop_distributed_table_metadata$$; +COMMENT ON FUNCTION master_drop_distributed_table_metadata(logicalrelid regclass, + schema_name text, + table_name text) + IS 'delete metadata of the distributed table'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger() + RETURNS event_trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = pg_catalog + AS $cdbdt$ +DECLARE + v_obj record; + sequence_names text[] := '{}'; + node_names text[] := '{}'; + node_ports bigint[] := '{}'; + node_name text; + node_port bigint; + table_colocation_id integer; +BEGIN + -- collect set of dropped sequences to drop on workers later + SELECT array_agg(object_identity) INTO sequence_names + FROM pg_event_trigger_dropped_objects() + WHERE object_type = 'sequence'; + + -- Must accumulate set of affected nodes before deleting placements, as + -- master_drop_all_shards will erase their rows, making it impossible for + -- us to know where to drop sequences (which must be dropped after shards, + -- since they have default value expressions which depend on sequences). + SELECT array_agg(sp.nodename), array_agg(sp.nodeport) + INTO node_names, node_ports + FROM pg_event_trigger_dropped_objects() AS dobj, + pg_dist_shard AS s, + pg_dist_shard_placement AS sp + WHERE dobj.object_type IN ('table', 'foreign table') + AND dobj.objid = s.logicalrelid + AND s.shardid = sp.shardid; + + FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP + IF v_obj.object_type NOT IN ('table', 'foreign table') THEN + CONTINUE; + END IF; + + -- nothing to do if not a distributed table + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN + CONTINUE; + END IF; + + -- get colocation group + SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + + -- ensure all shards are dropped + PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); + + PERFORM master_drop_distributed_table_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name); + + -- drop colocation group if all referencing tables are dropped + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN + DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id; + END IF; + END LOOP; + + IF cardinality(sequence_names) = 0 THEN + RETURN; + END IF; + + FOR node_name, node_port IN + SELECT DISTINCT name, port + FROM unnest(node_names, node_ports) AS nodes(name, port) + LOOP + PERFORM master_drop_sequences(sequence_names, node_name, node_port); + END LOOP; +END; +$cdbdt$; + +COMMENT ON FUNCTION citus_drop_trigger() + IS 'perform checks and actions at the end of DROP actions'; + +RESET search_path; \ No newline at end of file diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 1b343eb0a..1413d9ba1 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-8' +default_version = '6.1-9' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 1a68dbfb1..1f28ead9b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -38,9 +38,11 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" +#include "distributed/worker_transaction.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" @@ -79,8 +81,7 @@ static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, char replicationModel); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, - int shardCount, int replicationFactor, - char replicationModel); + int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); @@ -172,8 +173,23 @@ create_distributed_table(PG_FUNCTION_ARGS) /* use configuration values for shard count and shard replication factor */ CreateHashDistributedTable(relationId, distributionColumnName, colocateWithTableName, ShardCount, - ShardReplicationFactor, - REPLICATION_MODEL_COORDINATOR); + ShardReplicationFactor); + + if (ShouldSyncTableMetadata(relationId)) + { + List *commandList = GetDistributedTableDDLEvents(relationId); + ListCell *commandCell = NULL; + + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + + /* send the commands one by one */ + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + SendCommandToWorkers(WORKERS_WITH_METADATA, command); + } + } PG_RETURN_VOID(); } @@ -953,17 +969,28 @@ CreateTruncateTrigger(Oid relationId) static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, int shardCount, - int replicationFactor, char replicationModel) + int replicationFactor) { Relation distributedRelation = NULL; Relation pgDistColocation = NULL; uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; + char replicationModel = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); + /* all hash-distributed tables with repfactor=1 are treated as MX tables */ + if (replicationFactor == 1) + { + replicationModel = REPLICATION_MODEL_STREAMING; + } + else + { + replicationModel = REPLICATION_MODEL_COORDINATOR; + } + /* * Get an exclusive lock on the colocation system catalog. Therefore, we * can be sure that there will no modifications on the colocation table @@ -1004,7 +1031,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - colocationId, REPLICATION_MODEL_COORDINATOR); + colocationId, replicationModel); /* create shards */ if (sourceRelationId != InvalidOid) diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c new file mode 100644 index 000000000..ebe40e9d8 --- /dev/null +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -0,0 +1,56 @@ +/*------------------------------------------------------------------------- + * + * drop_distributed_table.c + * Routines related to dropping distributed relations from a trigger. + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_sync.h" +#include "distributed/worker_transaction.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(master_drop_distributed_table_metadata); + + +/* + * master_drop_distributed_table_metadata removes the entry of the specified distributed + * table from pg_dist_partition and drops the table from the workers if needed. + */ +Datum +master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + text *schemaNameText = PG_GETARG_TEXT_P(1); + text *tableNameText = PG_GETARG_TEXT_P(2); + bool shouldSyncMetadata = false; + + char *schemaName = text_to_cstring(schemaNameText); + char *tableName = text_to_cstring(tableNameText); + + CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); + + DeletePartitionRow(relationId); + + shouldSyncMetadata = ShouldSyncTableMetadata(relationId); + if (shouldSyncMetadata) + { + char *deleteDistributionCommand = NULL; + + /* drop the distributed table metadata on the workers */ + deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName); + SendCommandToWorkers(WORKERS_WITH_METADATA, deleteDistributionCommand); + } + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 4066ed8f0..9bab7b39b 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -37,6 +37,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" #include "distributed/multi_planner.h" @@ -49,6 +50,7 @@ #include "distributed/transaction_management.h" #include "distributed/transmit.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" @@ -107,6 +109,8 @@ static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, bool isTopLevel); static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, bool isTopLevel); +static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand); static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand, bool isTopLevel); @@ -147,6 +151,7 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) static bool warnedUserAbout2PC = false; + /* * Utility for handling citus specific concerns around utility statements. * @@ -167,6 +172,8 @@ multi_ProcessUtility(Node *parsetree, DestReceiver *dest, char *completionTag) { + bool schemaNode = SchemaNode(); + bool propagateChanges = schemaNode && EnableDDLPropagation; bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -222,8 +229,11 @@ multi_ProcessUtility(Node *parsetree, ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); } - /* ddl commands are propagated to workers only if EnableDDLPropagation is set */ - if (EnableDDLPropagation) + /* + * DDL commands are propagated to workers only if EnableDDLPropagation is + * set to true and the current node is the schema node + */ + if (propagateChanges) { bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); @@ -289,6 +299,24 @@ multi_ProcessUtility(Node *parsetree, "move all tables."))); } } + else if (!schemaNode) + { + if (IsA(parsetree, AlterTableStmt)) + { + AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; + if (alterTableStmt->relkind == OBJECT_TABLE) + { + /* + * When the schema node issues an ALTER TABLE ... ADD FOREIGN KEY + * command, the validation step should be skipped on the distributed + * table of the worker. Therefore, we check whether the given ALTER + * TABLE statement is a FOREIGN KEY constraint and if so disable the + * validation step. Note that validation is done on the shard level. + */ + parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString); + } + } + } /* * Inform the user about potential caveats. @@ -858,6 +886,68 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl } +/* + * WorkerProcessAlterTableStmt checks and processes the alter table statement to be + * worked on the distributed table of the worker node. Currently, it only processes + * ALTER TABLE ... ADD FOREIGN KEY command to skip the validation step. + */ +static Node * +WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand) +{ + LOCKMODE lockmode = 0; + Oid leftRelationId = InvalidOid; + bool isDistributedRelation = false; + List *commandList = NIL; + ListCell *commandCell = NULL; + + /* first check whether a distributed relation is affected */ + if (alterTableStatement->relation == NULL) + { + return (Node *) alterTableStatement; + } + + lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); + leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); + if (!OidIsValid(leftRelationId)) + { + return (Node *) alterTableStatement; + } + + isDistributedRelation = IsDistributedTable(leftRelationId); + if (!isDistributedRelation) + { + return (Node *) alterTableStatement; + } + + /* + * We check if there is a ADD FOREIGN CONSTRAINT command in sub commands list. + * If there is we assign referenced releation id to rightRelationId and we also + * set skip_validation to true to prevent PostgreSQL to verify validity of the + * foreign constraint in master. Validity will be checked in workers anyway. + */ + commandList = alterTableStatement->cmds; + + foreach(commandCell, commandList) + { + AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell); + AlterTableType alterTableType = command->subtype; + + if (alterTableType == AT_AddConstraint) + { + Constraint *constraint = (Constraint *) command->def; + if (constraint->contype == CONSTR_FOREIGN) + { + /* foreign constraint validations will be done in shards. */ + constraint->skip_validation = true; + } + } + } + + return (Node *) alterTableStatement; +} + + /* * ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed * objects. The function first checks if the statement belongs to a distributed objects @@ -1866,6 +1956,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, bool isTopLevel) { List *taskList = NIL; + bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1877,6 +1968,12 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ShowNoticeIfNotUsing2PC(); + if (shouldSyncMetadata) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + } + taskList = DDLTaskList(relationId, ddlCommandString); ExecuteModifyTasksWithoutResults(taskList); @@ -1900,6 +1997,7 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, const char *ddlCommandString, bool isTopLevel) { List *taskList = NIL; + bool shouldSyncMetadata = false; if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1911,6 +2009,18 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, ShowNoticeIfNotUsing2PC(); + /* + * It is sufficient to check only one of the tables for metadata syncing on workers, + * since the colocation of two tables implies that either both or none of them have + * metadata on workers. + */ + shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId); + if (shouldSyncMetadata) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + } + taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString); ExecuteModifyTasksWithoutResults(taskList); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 70e3d1b2e..9c7a7dafa 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -196,45 +196,16 @@ master_drop_all_shards(PG_FUNCTION_ARGS) text *schemaNameText = PG_GETARG_TEXT_P(1); text *relationNameText = PG_GETARG_TEXT_P(2); - char *schemaName = NULL; - char *relationName = NULL; bool isTopLevel = true; List *shardIntervalList = NIL; int droppedShardCount = 0; + char *schemaName = text_to_cstring(schemaNameText); + char *relationName = text_to_cstring(relationNameText); + PreventTransactionChain(isTopLevel, "DROP distributed table"); - relationName = get_rel_name(relationId); - - if (relationName != NULL) - { - /* ensure proper values are used if the table exists */ - Oid schemaId = get_rel_namespace(relationId); - schemaName = get_namespace_name(schemaId); - - /* - * Only allow the owner to drop all shards, this is more akin to DDL - * than DELETE. - */ - EnsureTableOwner(relationId); - } - else - { - /* table has been dropped, rely on user-supplied values */ - schemaName = text_to_cstring(schemaNameText); - relationName = text_to_cstring(relationNameText); - - /* - * Verify that this only is run as superuser - that's how it's used in - * our drop event trigger, and we can't verify permissions for an - * already dropped relation. - */ - if (!superuser()) - { - ereport(ERROR, (errmsg("cannot drop all shards of a dropped table as " - "non-superuser"))); - } - } + CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); shardIntervalList = LoadShardIntervalList(relationId); droppedShardCount = DropShards(relationId, schemaName, relationName, @@ -299,6 +270,35 @@ master_drop_sequences(PG_FUNCTION_ARGS) } +/* + * CheckTableSchemaNameForDrop errors out if the current user does not + * have permission to undistribute the given relation, taking into + * account that it may be called from the drop trigger. If the table exists, + * the function rewrites the given table and schema name. + */ +void +CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, char **tableName) +{ + char *tempTableName = get_rel_name(relationId); + + if (tempTableName != NULL) + { + /* ensure proper values are used if the table exists */ + Oid schemaId = get_rel_namespace(relationId); + (*schemaName) = get_namespace_name(schemaId); + (*tableName) = tempTableName; + + EnsureTableOwner(relationId); + } + else if (!superuser()) + { + /* table does not exist, must be called from drop trigger */ + ereport(ERROR, (errmsg("cannot drop distributed table metadata as a " + "non-superuser"))); + } +} + + /* * DropShards drops all given shards in a relation. The id, name and schema * for the relation are explicitly provided, since this function may be diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 03824c008..60b77a164 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -515,6 +515,49 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, } +/* + * DeletePartitionRow removes the row from pg_dist_partition where the logicalrelid + * field equals to distributedRelationId. Then, the function invalidates the + * metadata cache. + */ +void +DeletePartitionRow(Oid distributedRelationId) +{ + Relation pgDistPartition = NULL; + HeapTuple heapTuple = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); + + scanDescriptor = systable_beginscan(pgDistPartition, InvalidOid, false, NULL, + scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for partition %d", + distributedRelationId))); + } + + simple_heap_delete(pgDistPartition, &heapTuple->t_self); + + systable_endscan(scanDescriptor); + + /* invalidate the cache */ + CitusInvalidateRelcacheByRelid(distributedRelationId); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + + heap_close(pgDistPartition, RowExclusiveLock); +} + + /* * DeleteShardRow opens the shard system catalog, finds the unique row that has * the given shardId, and deletes this row. diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 2ff2f5446..43c3332bf 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -847,6 +847,17 @@ ShardStorageType(Oid relationId) } +/* + * SchemaNode function returns true if this node is identified as the + * schema/coordinator/master node of the cluster. + */ +bool +SchemaNode(void) +{ + return (GetLocalGroupId() == 0); +} + + /* * WorkerNodeGetDatum converts the worker node passed to it into its datum * representation. To do this, the function first creates the heap tuple from diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 643b6102c..03e3e7019 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -271,6 +271,53 @@ MetadataCreateCommands(void) } +/* + * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to + * create the given distributed table on a worker. The list includes setting up any + * sequences, setting the owner of the table, inserting table and shard metadata, + * setting the truncate trigger and foreign key constraints. + */ +List * +GetDistributedTableDDLEvents(Oid relationId) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + + List *shardIntervalList = NIL; + List *commandList = NIL; + List *foreignConstraintCommands = NIL; + List *shardMetadataInsertCommandList = NIL; + char *tableOwnerResetCommand = NULL; + char *metadataCommand = NULL; + char *truncateTriggerCreateCommand = NULL; + + /* commands to create the table */ + commandList = GetTableDDLEvents(relationId); + + /* command to reset the table owner */ + tableOwnerResetCommand = TableOwnerResetCommand(relationId); + commandList = lappend(commandList, tableOwnerResetCommand); + + /* command to insert pg_dist_partition entry */ + metadataCommand = DistributionCreateCommand(cacheEntry); + commandList = lappend(commandList, metadataCommand); + + /* commands to create the truncate trigger of the mx table */ + truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); + commandList = lappend(commandList, truncateTriggerCreateCommand); + + /* commands to insert pg_dist_shard & pg_dist_shard_placement entries */ + shardIntervalList = LoadShardIntervalList(relationId); + shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); + commandList = list_concat(commandList, shardMetadataInsertCommandList); + + /* commands to create foreign key constraints */ + foreignConstraintCommands = GetTableForeignConstraintCommands(relationId); + commandList = list_concat(commandList, foreignConstraintCommands); + + return commandList; +} + + /* * MetadataDropCommands returns list of queries that are required to * drop all the metadata of the node that are related to clustered tables. @@ -547,6 +594,25 @@ NodeDeleteCommand(uint32 nodeId) } +/* + * ColocationIdUpdateCommand creates the SQL command to change the colocationId + * of the table with the given name to the given colocationId in pg_dist_partition + * table. + */ +char * +ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) +{ + StringInfo command = makeStringInfo(); + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + appendStringInfo(command, "UPDATE pg_dist_partition " + "SET colocationid = %d " + "WHERE logicalrelid = %s::regclass", + colocationId, quote_literal_cstr(qualifiedRelationName)); + + return command->data; +} + + /* * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * of a worker and returns the command in a string. diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 02ac8285c..4a2035dcd 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -22,11 +22,13 @@ #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" @@ -617,6 +619,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) HeapTuple heapTuple = NULL; TupleDesc tupleDescriptor = NULL; SysScanDesc scanDescriptor = NULL; + bool shouldSyncMetadata = false; bool indexOK = true; int scanKeyCount = 1; ScanKeyData scanKey[scanKeyCount]; @@ -660,6 +663,15 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) systable_endscan(scanDescriptor); heap_close(pgDistPartition, NoLock); + + shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId); + if (shouldSyncMetadata) + { + char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId, + colocationId); + + SendCommandToWorkers(WORKERS_WITH_METADATA, updateColocationIdCommand); + } } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 4aef00c46..bb1bd75f9 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1377,6 +1377,7 @@ GetLocalGroupId(void) TupleDesc tupleDescriptor = NULL; Oid groupId = InvalidOid; Relation pgDistLocalGroupId = NULL; + Oid localGroupTableOid = InvalidOid; /* * Already set the group id, no need to read the heap again. @@ -1386,7 +1387,13 @@ GetLocalGroupId(void) return LocalGroupId; } - pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock); + localGroupTableOid = get_relname_relid("pg_dist_local_group", PG_CATALOG_NAMESPACE); + if (localGroupTableOid == InvalidOid) + { + return 0; + } + + pgDistLocalGroupId = heap_open(localGroupTableOid, AccessShareLock); scanDescriptor = systable_beginscan(pgDistLocalGroupId, InvalidOid, false, diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 077f78dc9..d802f643f 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -29,9 +29,6 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table); -static void DeletePartitionRow(Oid distributedRelationId); - - /* * worker_drop_distributed_table drops the distributed table with the given oid, * then, removes the associated rows from pg_dist_partition, pg_dist_shard and @@ -133,46 +130,3 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - - -/* - * DeletePartitionRow removes the row from pg_dist_partition where the logicalrelid - * field equals to distributedRelationId. Then, the function invalidates the - * metadata cache. - */ -void -DeletePartitionRow(Oid distributedRelationId) -{ - Relation pgDistPartition = NULL; - HeapTuple heapTuple = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - - pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); - - scanDescriptor = systable_beginscan(pgDistPartition, InvalidOid, false, NULL, - scanKeyCount, scanKey); - - heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for partition %d", - distributedRelationId))); - } - - simple_heap_delete(pgDistPartition, &heapTuple->t_self); - - systable_endscan(scanDescriptor); - - /* invalidate the cache */ - CitusInvalidateRelcacheByRelid(distributedRelationId); - - /* increment the counter so that next command can see the row */ - CommandCounterIncrement(); - - heap_close(pgDistPartition, RowExclusiveLock); -} diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 3a8a62670..1e858ae6a 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -76,6 +76,7 @@ extern void DeleteShardRow(uint64 shardId); extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, char *nodeName, uint32 nodePort); +extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 7600093c3..5412d37c9 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -91,6 +91,8 @@ extern int ShardMaxSize; extern int ShardPlacementPolicy; +extern bool SchemaNode(void); + /* Function declarations local to the distributed module */ extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); @@ -112,6 +114,8 @@ extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, List *ddlCommandList, List *foreignConstraintCommadList); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); +extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, + char **tableName); /* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 94efd573a..936c7b34e 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -20,6 +20,7 @@ /* Functions declarations for metadata syncing */ extern bool ShouldSyncTableMetadata(Oid relationId); extern List * MetadataCreateCommands(void); +extern List * GetDistributedTableDDLEvents(Oid relationId); extern List * MetadataDropCommands(void); extern char * DistributionCreateCommand(DistTableCacheEntry *cacheEntry); extern char * DistributionDeleteCommand(char *schemaName, @@ -28,11 +29,13 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); extern char * NodeDeleteCommand(uint32 nodeId); +extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" +#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #endif /* METADATA_SYNC_H */ diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index cc0e44623..724f5fa04 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -371,6 +371,8 @@ SELECT create_distributed_table('table2_groupB', 'id'); (1 row) +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='table1_groupB'::regclass; +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='table2_groupB'::regclass; -- revert back to default shard replication factor SET citus.shard_replication_factor to DEFAULT; -- change partition column type diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 4ce0aad0c..27caeeb87 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -137,3 +137,53 @@ SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'ap (1 row) +-- Show that when a hash distributed table with replication factor=1 is created, it +-- automatically marked as streaming replicated +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_table_test (col1 int, col2 text); +SELECT create_distributed_table('mx_table_test', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; + repmodel +---------- + s +(1 row) + +DROP TABLE mx_table_test; +-- Show that it is not possible to create an mx table with the old +-- master_create_distributed_table function +CREATE TABLE mx_table_test (col1 int, col2 text); +SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE mx_table_test; +-- Show that when replication factor > 1 the table is created as coordinator-replicated +SET citus.shard_replication_factor TO 2; +CREATE TABLE mx_table_test (col1 int, col2 text); +SELECT create_distributed_table('mx_table_test', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE mx_table_test; +SET citus.shard_replication_factor TO default; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index a8f167647..1c9e7539c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -66,6 +66,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; ALTER EXTENSION citus UPDATE TO '6.1-7'; ALTER EXTENSION citus UPDATE TO '6.1-8'; +ALTER EXTENSION citus UPDATE TO '6.1-9'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_sync.out similarity index 69% rename from src/test/regress/expected/multi_metadata_snapshot.out rename to src/test/regress/expected/multi_metadata_sync.out index f3ef9c6aa..7df75b93d 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1,7 +1,8 @@ -- --- MULTI_METADATA_SNAPSHOT +-- MULTI_METADATA_SYNC -- --- Tests for metadata snapshot functions. +-- Tests for metadata snapshot functions, metadata syncing functions and propagation of +-- metadata changes to MX tables. ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000; SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id @@ -250,6 +251,7 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': (1 row) -- Make sure that start_metadata_sync_to_node considers foreign key constraints +\c - - - :master_port SET citus.shard_replication_factor TO 1; CREATE SCHEMA mx_testing_schema_2; CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); @@ -267,11 +269,6 @@ SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); (1 row) -UPDATE - pg_dist_partition SET repmodel='s' -WHERE - logicalrelid='mx_testing_schema.fk_test_1'::regclass - OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node @@ -292,6 +289,8 @@ Foreign-key constraints: "fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3) \c - - - :master_port +DROP TABLE mx_testing_schema_2.fk_test_2; +DROP TABLE mx_testing_schema.fk_test_1; RESET citus.shard_replication_factor; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port @@ -382,6 +381,60 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; f (1 row) +-- Check that the distributed table can be queried from the worker +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +CREATE TABLE mx_query_test (a int, b text, c int); +SELECT create_distributed_table('mx_query_test', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_query_test'::regclass; + repmodel +---------- + s +(1 row) + +INSERT INTO mx_query_test VALUES (1, 'one', 1); +INSERT INTO mx_query_test VALUES (2, 'two', 4); +INSERT INTO mx_query_test VALUES (3, 'three', 9); +INSERT INTO mx_query_test VALUES (4, 'four', 16); +INSERT INTO mx_query_test VALUES (5, 'five', 24); +\c - - - :worker_1_port +SELECT * FROM mx_query_test ORDER BY a; + a | b | c +---+-------+---- + 1 | one | 1 + 2 | two | 4 + 3 | three | 9 + 4 | four | 16 + 5 | five | 24 +(5 rows) + +INSERT INTO mx_query_test VALUES (6, 'six', 36); +UPDATE mx_query_test SET c = 25 WHERE a = 5; +\c - - - :master_port +SELECT * FROM mx_query_test ORDER BY a; + a | b | c +---+-------+---- + 1 | one | 1 + 2 | two | 4 + 3 | three | 9 + 4 | four | 16 + 5 | five | 25 + 6 | six | 36 +(6 rows) + +\c - - - :master_port +DROP TABLE mx_query_test; -- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -408,8 +461,411 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; f (1 row) +-- Test DDL propagation in MX tables +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SET citus.shard_count = 5; +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE SCHEMA mx_test_schema_1; +CREATE SCHEMA mx_test_schema_2; +-- Create MX tables +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); +CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); +CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text); +CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 (col2); +ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col1) REFERENCES mx_test_schema_1.mx_table_1(col1); +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_2" btree (col2) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +-- Check that created tables are marked as streaming replicated tables +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid; + logicalrelid | repmodel +-----------------------------+---------- + mx_test_schema_1.mx_table_1 | s + mx_test_schema_2.mx_table_2 | s +(2 rows) + +-- See the shards and placements of the mx tables +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + logicalrelid | shardid | nodename | nodeport +-----------------------------+---------+-----------+---------- + mx_test_schema_1.mx_table_1 | 1310104 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310105 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310106 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310107 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310108 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310109 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310110 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310111 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310112 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310113 | localhost | 57637 +(10 rows) + + +-- Check that metadata of MX tables exist on the metadata worker +\c - - - :worker_1_port +-- Check that tables are created +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_2" btree (col2) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that table metadata are created +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + logicalrelid | repmodel +-----------------------------+---------- + mx_test_schema_1.mx_table_1 | s + mx_test_schema_2.mx_table_2 | s +(2 rows) + +-- Check that shard and placement data are created +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + logicalrelid | shardid | nodename | nodeport +-----------------------------+---------+-----------+---------- + mx_test_schema_1.mx_table_1 | 1310104 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310105 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310106 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310107 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310108 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310109 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310110 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310111 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310112 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310113 | localhost | 57637 +(10 rows) + +-- Check that metadata of MX tables don't exist on the non-metadata worker +\c - - - :worker_2_port +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 +SELECT * FROM pg_dist_partition; + logicalrelid | partmethod | partkey | colocationid | repmodel +--------------+------------+---------+--------------+---------- +(0 rows) + +SELECT * FROM pg_dist_shard; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +--------------+---------+--------------+---------------+--------------- +(0 rows) + +SELECT * FROM pg_dist_shard_placement; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+----------+----------+------------- +(0 rows) + +-- Check that CREATE INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1); +WARNING: hash indexes are not WAL-logged and their use is discouraged +CREATE UNIQUE INDEX mx_index_4 ON mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_4" UNIQUE, btree (col1) + "mx_index_2" btree (col2) + "mx_index_3" hash (col1) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that DROP INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +DROP INDEX mx_test_schema_2.mx_index_3; +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_4" UNIQUE, btree (col1) + "mx_index_2" btree (col2) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that ALTER TABLE statements are propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; +ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | + col3 | integer | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that foreign key constraint with NOT VALID works as well +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint_2 +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1) +NOT VALID; +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | + col3 | integer | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Foreign-key constraints: + "mx_fk_constraint_2" FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) NOT VALID +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that mark_tables_colocated call propagates the changes to the workers +\c - - - :master_port +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; +SET citus.shard_count TO 7; +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_colocation_test_1 (a int); +SELECT create_distributed_table('mx_colocation_test_1', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE mx_colocation_test_2 (a int); +SELECT create_distributed_table('mx_colocation_test_2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- Check the colocation IDs of the created tables +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass +ORDER BY logicalrelid; + logicalrelid | colocationid +----------------------+-------------- + mx_colocation_test_1 | 10000 + mx_colocation_test_2 | 10000 +(2 rows) + + +-- Reset the colocation IDs of the test tables +DELETE FROM + pg_dist_colocation +WHERE EXISTS ( + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid + AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); +UPDATE + pg_dist_partition +SET + colocationid = 0 +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; +-- Mark tables colocated and see the changes on the master and the worker +SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']); + mark_tables_colocated +----------------------- + +(1 row) + +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + logicalrelid | colocationid +----------------------+-------------- + mx_colocation_test_1 | 10001 + mx_colocation_test_2 | 10001 +(2 rows) + +\c - - - :worker_1_port +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + logicalrelid | colocationid +----------------------+-------------- + mx_colocation_test_1 | 10001 + mx_colocation_test_2 | 10001 +(2 rows) + +\c - - - :master_port +-- Check that DROP TABLE on MX tables works +DROP TABLE mx_colocation_test_1; +DROP TABLE mx_colocation_test_2; +\d mx_colocation_test_1 +\d mx_colocation_test_2 +\c - - - :worker_1_port +\d mx_colocation_test_1 +\d mx_colocation_test_2 + +-- Check that dropped MX table can be recreated again +\c - - - :master_port +SET citus.shard_count TO 7; +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_temp_drop_test (a int); +SELECT create_distributed_table('mx_temp_drop_test', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass; + logicalrelid | repmodel +-------------------+---------- + mx_temp_drop_test | s +(1 row) + +DROP TABLE mx_temp_drop_test; +CREATE TABLE mx_temp_drop_test (a int); +SELECT create_distributed_table('mx_temp_drop_test', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass; + logicalrelid | repmodel +-------------------+---------- + mx_temp_drop_test | s +(1 row) + +DROP TABLE mx_temp_drop_test; -- Cleanup \c - - - :worker_1_port +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DELETE FROM pg_dist_node; DELETE FROM pg_dist_partition; @@ -429,5 +885,12 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); (1 row) -DROP TABLE mx_testing_schema.mx_test_table CASCADE; +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; +DROP TABLE mx_testing_schema.mx_test_table; +RESET citus.shard_count; +RESET citus.shard_replication_factor; +RESET citus.multi_shard_commit_protocol; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5726a09f8..5b57052f4 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -139,7 +139,7 @@ test: multi_data_types test: multi_repartition_udt test: multi_repartitioned_subquery_udf test: multi_modifying_xacts -test: multi_metadata_snapshot +test: multi_metadata_sync test: multi_transaction_recovery # --------- diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index c1dfaeedb..336758f56 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -175,6 +175,9 @@ SELECT create_distributed_table('table1_groupB', 'id'); CREATE TABLE table2_groupB ( id int ); SELECT create_distributed_table('table2_groupB', 'id'); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='table1_groupB'::regclass; +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='table2_groupB'::regclass; + -- revert back to default shard replication factor SET citus.shard_replication_factor to DEFAULT; diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 17e12b7a2..1f0dfef22 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -111,3 +111,28 @@ CREATE TABLE supplier_single_shard s_comment varchar(101) not null ); SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'append'); + +-- Show that when a hash distributed table with replication factor=1 is created, it +-- automatically marked as streaming replicated +SET citus.shard_replication_factor TO 1; + +CREATE TABLE mx_table_test (col1 int, col2 text); +SELECT create_distributed_table('mx_table_test', 'col1'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +DROP TABLE mx_table_test; + +-- Show that it is not possible to create an mx table with the old +-- master_create_distributed_table function +CREATE TABLE mx_table_test (col1 int, col2 text); +SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +DROP TABLE mx_table_test; + +-- Show that when replication factor > 1 the table is created as coordinator-replicated +SET citus.shard_replication_factor TO 2; +CREATE TABLE mx_table_test (col1 int, col2 text); +SELECT create_distributed_table('mx_table_test', 'col1'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +DROP TABLE mx_table_test; + +SET citus.shard_replication_factor TO default; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 89018fe21..ebeb66a3f 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -66,6 +66,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; ALTER EXTENSION citus UPDATE TO '6.1-7'; ALTER EXTENSION citus UPDATE TO '6.1-8'; +ALTER EXTENSION citus UPDATE TO '6.1-9'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_metadata_snapshot.sql b/src/test/regress/sql/multi_metadata_snapshot.sql deleted file mode 100644 index 63869e110..000000000 --- a/src/test/regress/sql/multi_metadata_snapshot.sql +++ /dev/null @@ -1,155 +0,0 @@ --- --- MULTI_METADATA_SNAPSHOT --- - --- Tests for metadata snapshot functions. - - -ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000; -ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000; - -SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id -\gset -ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; - --- Create the necessary test utility function -CREATE FUNCTION master_metadata_snapshot() - RETURNS text[] - LANGUAGE C STRICT - AS 'citus'; - -COMMENT ON FUNCTION master_metadata_snapshot() - IS 'commands to create the metadata snapshot'; - --- Show that none of the existing tables are qualified to be MX tables -SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; - --- Show that, with no MX tables, metadata snapshot contains only the delete commands and --- pg_dist_node entries -SELECT unnest(master_metadata_snapshot()); - --- Create a test table with constraints and SERIAL -CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 SERIAL); -SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash'); -SELECT master_create_worker_shards('mx_test_table', 8, 1); - --- Set the replication model of the test table to streaming replication so that it is --- considered as an MX table -UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass; - --- Show that the created MX table is included in the metadata snapshot -SELECT unnest(master_metadata_snapshot()); - --- Show that CREATE INDEX commands are included in the metadata snapshot -CREATE INDEX mx_index ON mx_test_table(col_2); -SELECT unnest(master_metadata_snapshot()); - --- Show that schema changes are included in the metadata snapshot -CREATE SCHEMA mx_testing_schema; -ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema; -SELECT unnest(master_metadata_snapshot()); - --- Show that append distributed tables are not included in the metadata snapshot -CREATE TABLE non_mx_test_table (col_1 int, col_2 text); -SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append'); -UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass; -SELECT unnest(master_metadata_snapshot()); - --- Show that range distributed tables are not included in the metadata snapshot -UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; -SELECT unnest(master_metadata_snapshot()); - --- Test start_metadata_sync_to_node UDF - --- Ensure that hasmetadata=false for all nodes -SELECT count(*) FROM pg_dist_node WHERE hasmetadata=true; - --- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port; - --- Check that the metadata has been copied to the worker -\c - - - :worker_1_port -SELECT * FROM pg_dist_local_group; -SELECT * FROM pg_dist_node ORDER BY nodeid; -SELECT * FROM pg_dist_partition ORDER BY logicalrelid; -SELECT * FROM pg_dist_shard ORDER BY shardid; -SELECT * FROM pg_dist_shard_placement ORDER BY shardid; -\d mx_testing_schema.mx_test_table - --- Check that pg_dist_colocation is not synced -SELECT * FROM pg_dist_colocation ORDER BY colocationid; - --- Make sure that truncate trigger has been set for the MX table on worker -SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; - --- Make sure that start_metadata_sync_to_node considers foreign key constraints -SET citus.shard_replication_factor TO 1; - -CREATE SCHEMA mx_testing_schema_2; - -CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); -CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, - FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3)); - -SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); -SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); - -UPDATE - pg_dist_partition SET repmodel='s' -WHERE - logicalrelid='mx_testing_schema.fk_test_1'::regclass - OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass; - -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - --- Check that foreign key metadata exists on the worker -\c - - - :worker_1_port -\d mx_testing_schema_2.fk_test_2 -\c - - - :master_port - -RESET citus.shard_replication_factor; - --- Check that repeated calls to start_metadata_sync_to_node has no side effects -\c - - - :master_port -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -\c - - - :worker_1_port -SELECT * FROM pg_dist_local_group; -SELECT * FROM pg_dist_node ORDER BY nodeid; -SELECT * FROM pg_dist_partition ORDER BY logicalrelid; -SELECT * FROM pg_dist_shard ORDER BY shardid; -SELECT * FROM pg_dist_shard_placement ORDER BY shardid; -\d mx_testing_schema.mx_test_table -SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; - --- Make sure that start_metadata_sync_to_node cannot be called inside a transaction -\c - - - :master_port -BEGIN; -SELECT start_metadata_sync_to_node('localhost', :worker_2_port); -ROLLBACK; - -SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; - --- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false -\c - - - :master_port -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; - --- Cleanup -\c - - - :worker_1_port -DROP TABLE mx_testing_schema.mx_test_table; -DELETE FROM pg_dist_node; -DELETE FROM pg_dist_partition; -DELETE FROM pg_dist_shard; -DELETE FROM pg_dist_shard_placement; -\d mx_testing_schema.mx_test_table - -\c - - - :master_port -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); -DROP TABLE mx_testing_schema.mx_test_table CASCADE; - -ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql new file mode 100644 index 000000000..f59ee8d15 --- /dev/null +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -0,0 +1,405 @@ +-- +-- MULTI_METADATA_SYNC +-- + +-- Tests for metadata snapshot functions, metadata syncing functions and propagation of +-- metadata changes to MX tables. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000; + +SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id +\gset +ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; + +-- Create the necessary test utility function +CREATE FUNCTION master_metadata_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; + +COMMENT ON FUNCTION master_metadata_snapshot() + IS 'commands to create the metadata snapshot'; + +-- Show that none of the existing tables are qualified to be MX tables +SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; + +-- Show that, with no MX tables, metadata snapshot contains only the delete commands and +-- pg_dist_node entries +SELECT unnest(master_metadata_snapshot()); + +-- Create a test table with constraints and SERIAL +CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 SERIAL); +SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash'); +SELECT master_create_worker_shards('mx_test_table', 8, 1); + +-- Set the replication model of the test table to streaming replication so that it is +-- considered as an MX table +UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass; + +-- Show that the created MX table is included in the metadata snapshot +SELECT unnest(master_metadata_snapshot()); + +-- Show that CREATE INDEX commands are included in the metadata snapshot +CREATE INDEX mx_index ON mx_test_table(col_2); +SELECT unnest(master_metadata_snapshot()); + +-- Show that schema changes are included in the metadata snapshot +CREATE SCHEMA mx_testing_schema; +ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema; +SELECT unnest(master_metadata_snapshot()); + +-- Show that append distributed tables are not included in the metadata snapshot +CREATE TABLE non_mx_test_table (col_1 int, col_2 text); +SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append'); +UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass; +SELECT unnest(master_metadata_snapshot()); + +-- Show that range distributed tables are not included in the metadata snapshot +UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; +SELECT unnest(master_metadata_snapshot()); + +-- Test start_metadata_sync_to_node UDF + +-- Ensure that hasmetadata=false for all nodes +SELECT count(*) FROM pg_dist_node WHERE hasmetadata=true; + +-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port; + +-- Check that the metadata has been copied to the worker +\c - - - :worker_1_port +SELECT * FROM pg_dist_local_group; +SELECT * FROM pg_dist_node ORDER BY nodeid; +SELECT * FROM pg_dist_partition ORDER BY logicalrelid; +SELECT * FROM pg_dist_shard ORDER BY shardid; +SELECT * FROM pg_dist_shard_placement ORDER BY shardid; +\d mx_testing_schema.mx_test_table + +-- Check that pg_dist_colocation is not synced +SELECT * FROM pg_dist_colocation ORDER BY colocationid; + +-- Make sure that truncate trigger has been set for the MX table on worker +SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; + +-- Make sure that start_metadata_sync_to_node considers foreign key constraints +\c - - - :master_port +SET citus.shard_replication_factor TO 1; + +CREATE SCHEMA mx_testing_schema_2; + +CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); +CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, + FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3)); + +SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); +SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +-- Check that foreign key metadata exists on the worker +\c - - - :worker_1_port +\d mx_testing_schema_2.fk_test_2 + +\c - - - :master_port +DROP TABLE mx_testing_schema_2.fk_test_2; +DROP TABLE mx_testing_schema.fk_test_1; + +RESET citus.shard_replication_factor; + +-- Check that repeated calls to start_metadata_sync_to_node has no side effects +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +\c - - - :worker_1_port +SELECT * FROM pg_dist_local_group; +SELECT * FROM pg_dist_node ORDER BY nodeid; +SELECT * FROM pg_dist_partition ORDER BY logicalrelid; +SELECT * FROM pg_dist_shard ORDER BY shardid; +SELECT * FROM pg_dist_shard_placement ORDER BY shardid; +\d mx_testing_schema.mx_test_table +SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; + +-- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +\c - - - :master_port +BEGIN; +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +ROLLBACK; + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + +-- Check that the distributed table can be queried from the worker +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +CREATE TABLE mx_query_test (a int, b text, c int); +SELECT create_distributed_table('mx_query_test', 'a'); + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_query_test'::regclass; + +INSERT INTO mx_query_test VALUES (1, 'one', 1); +INSERT INTO mx_query_test VALUES (2, 'two', 4); +INSERT INTO mx_query_test VALUES (3, 'three', 9); +INSERT INTO mx_query_test VALUES (4, 'four', 16); +INSERT INTO mx_query_test VALUES (5, 'five', 24); + +\c - - - :worker_1_port +SELECT * FROM mx_query_test ORDER BY a; +INSERT INTO mx_query_test VALUES (6, 'six', 36); +UPDATE mx_query_test SET c = 25 WHERE a = 5; + +\c - - - :master_port +SELECT * FROM mx_query_test ORDER BY a; + +\c - - - :master_port +DROP TABLE mx_query_test; + +-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; + + +-- Test DDL propagation in MX tables +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SET citus.shard_count = 5; +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE SCHEMA mx_test_schema_1; +CREATE SCHEMA mx_test_schema_2; + +-- Create MX tables +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); +CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); + +CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text); +CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 (col2); +ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col1) REFERENCES mx_test_schema_1.mx_table_1(col1); + +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 + +SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1'); +SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1'); + +-- Check that created tables are marked as streaming replicated tables +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid; + +-- See the shards and placements of the mx tables +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + +-- Check that metadata of MX tables exist on the metadata worker +\c - - - :worker_1_port + +-- Check that tables are created +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 + +-- Check that table metadata are created +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + +-- Check that shard and placement data are created +SELECT + logicalrelid, shardid, nodename, nodeport +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid, shardid; + +-- Check that metadata of MX tables don't exist on the non-metadata worker +\c - - - :worker_2_port + +\d mx_test_schema_1.mx_table_1 +\d mx_test_schema_2.mx_table_2 + +SELECT * FROM pg_dist_partition; +SELECT * FROM pg_dist_shard; +SELECT * FROM pg_dist_shard_placement; + +-- Check that CREATE INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1); +CREATE UNIQUE INDEX mx_index_4 ON mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 + +-- Check that DROP INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +DROP INDEX mx_test_schema_2.mx_index_3; +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 + +-- Check that ALTER TABLE statements are propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; +ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 + +-- Check that foreign key constraint with NOT VALID works as well +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint_2 +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1) +NOT VALID; +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 + +-- Check that mark_tables_colocated call propagates the changes to the workers +\c - - - :master_port +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; +SET citus.shard_count TO 7; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE mx_colocation_test_1 (a int); +SELECT create_distributed_table('mx_colocation_test_1', 'a'); + +CREATE TABLE mx_colocation_test_2 (a int); +SELECT create_distributed_table('mx_colocation_test_2', 'a'); + +-- Check the colocation IDs of the created tables +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass +ORDER BY logicalrelid; + +-- Reset the colocation IDs of the test tables +DELETE FROM + pg_dist_colocation +WHERE EXISTS ( + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid + AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); +UPDATE + pg_dist_partition +SET + colocationid = 0 +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + +-- Mark tables colocated and see the changes on the master and the worker +SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']); +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; +\c - - - :worker_1_port +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + +\c - - - :master_port + +-- Check that DROP TABLE on MX tables works +DROP TABLE mx_colocation_test_1; +DROP TABLE mx_colocation_test_2; +\d mx_colocation_test_1 +\d mx_colocation_test_2 + +\c - - - :worker_1_port +\d mx_colocation_test_1 +\d mx_colocation_test_2 + +-- Check that dropped MX table can be recreated again +\c - - - :master_port +SET citus.shard_count TO 7; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE mx_temp_drop_test (a int); +SELECT create_distributed_table('mx_temp_drop_test', 'a'); +SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass; + +DROP TABLE mx_temp_drop_test; + +CREATE TABLE mx_temp_drop_test (a int); +SELECT create_distributed_table('mx_temp_drop_test', 'a'); +SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_temp_drop_test'::regclass; + +DROP TABLE mx_temp_drop_test; + +-- Cleanup +\c - - - :worker_1_port +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; +DROP TABLE mx_testing_schema.mx_test_table; +DELETE FROM pg_dist_node; +DELETE FROM pg_dist_partition; +DELETE FROM pg_dist_shard; +DELETE FROM pg_dist_shard_placement; +\d mx_testing_schema.mx_test_table +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; +DROP TABLE mx_testing_schema.mx_test_table; + +RESET citus.shard_count; +RESET citus.shard_replication_factor; +RESET citus.multi_shard_commit_protocol; + +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; +ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;