From 71d73ec5fffbb84ff2787c40f2f340e7c46e9df5 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Sun, 18 Dec 2016 13:57:19 +0300 Subject: [PATCH] Propagate DDL commands to metadata workers for MX tables --- .../commands/create_distributed_table.c | 4 +- .../distributed/executor/multi_utility.c | 114 ++++++++- .../distributed/master/master_node_protocol.c | 11 + .../distributed/utils/metadata_cache.c | 9 +- src/include/distributed/master_protocol.h | 2 + src/include/distributed/metadata_sync.h | 1 + .../regress/expected/multi_metadata_sync.out | 217 +++++++++++++++--- src/test/regress/sql/multi_metadata_sync.sql | 104 ++++++++- 8 files changed, 415 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 8bc516242..1f28ead9b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -180,9 +180,7 @@ create_distributed_table(PG_FUNCTION_ARGS) List *commandList = GetDistributedTableDDLEvents(relationId); ListCell *commandCell = NULL; - /* disable DDL propagation on workers */ - SendCommandToWorkers(WORKERS_WITH_METADATA, - "SET citus.enable_ddl_propagation TO off"); + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); /* send the commands one by one */ foreach(commandCell, commandList) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 4066ed8f0..9bab7b39b 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -37,6 +37,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" #include "distributed/multi_planner.h" @@ -49,6 +50,7 @@ #include "distributed/transaction_management.h" #include "distributed/transmit.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" @@ -107,6 +109,8 @@ static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, bool isTopLevel); static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, bool isTopLevel); +static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand); static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand, bool isTopLevel); @@ -147,6 +151,7 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) static bool warnedUserAbout2PC = false; + /* * Utility for handling citus specific concerns around utility statements. * @@ -167,6 +172,8 @@ multi_ProcessUtility(Node *parsetree, DestReceiver *dest, char *completionTag) { + bool schemaNode = SchemaNode(); + bool propagateChanges = schemaNode && EnableDDLPropagation; bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -222,8 +229,11 @@ multi_ProcessUtility(Node *parsetree, ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); } - /* ddl commands are propagated to workers only if EnableDDLPropagation is set */ - if (EnableDDLPropagation) + /* + * DDL commands are propagated to workers only if EnableDDLPropagation is + * set to true and the current node is the schema node + */ + if (propagateChanges) { bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); @@ -289,6 +299,24 @@ multi_ProcessUtility(Node *parsetree, "move all tables."))); } } + else if (!schemaNode) + { + if (IsA(parsetree, AlterTableStmt)) + { + AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; + if (alterTableStmt->relkind == OBJECT_TABLE) + { + /* + * When the schema node issues an ALTER TABLE ... ADD FOREIGN KEY + * command, the validation step should be skipped on the distributed + * table of the worker. Therefore, we check whether the given ALTER + * TABLE statement is a FOREIGN KEY constraint and if so disable the + * validation step. Note that validation is done on the shard level. + */ + parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString); + } + } + } /* * Inform the user about potential caveats. @@ -858,6 +886,68 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl } +/* + * WorkerProcessAlterTableStmt checks and processes the alter table statement to be + * worked on the distributed table of the worker node. Currently, it only processes + * ALTER TABLE ... ADD FOREIGN KEY command to skip the validation step. + */ +static Node * +WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand) +{ + LOCKMODE lockmode = 0; + Oid leftRelationId = InvalidOid; + bool isDistributedRelation = false; + List *commandList = NIL; + ListCell *commandCell = NULL; + + /* first check whether a distributed relation is affected */ + if (alterTableStatement->relation == NULL) + { + return (Node *) alterTableStatement; + } + + lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); + leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); + if (!OidIsValid(leftRelationId)) + { + return (Node *) alterTableStatement; + } + + isDistributedRelation = IsDistributedTable(leftRelationId); + if (!isDistributedRelation) + { + return (Node *) alterTableStatement; + } + + /* + * We check if there is a ADD FOREIGN CONSTRAINT command in sub commands list. + * If there is we assign referenced releation id to rightRelationId and we also + * set skip_validation to true to prevent PostgreSQL to verify validity of the + * foreign constraint in master. Validity will be checked in workers anyway. + */ + commandList = alterTableStatement->cmds; + + foreach(commandCell, commandList) + { + AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell); + AlterTableType alterTableType = command->subtype; + + if (alterTableType == AT_AddConstraint) + { + Constraint *constraint = (Constraint *) command->def; + if (constraint->contype == CONSTR_FOREIGN) + { + /* foreign constraint validations will be done in shards. */ + constraint->skip_validation = true; + } + } + } + + return (Node *) alterTableStatement; +} + + /* * ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed * objects. The function first checks if the statement belongs to a distributed objects @@ -1866,6 +1956,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, bool isTopLevel) { List *taskList = NIL; + bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1877,6 +1968,12 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ShowNoticeIfNotUsing2PC(); + if (shouldSyncMetadata) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + } + taskList = DDLTaskList(relationId, ddlCommandString); ExecuteModifyTasksWithoutResults(taskList); @@ -1900,6 +1997,7 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, const char *ddlCommandString, bool isTopLevel) { List *taskList = NIL; + bool shouldSyncMetadata = false; if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1911,6 +2009,18 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, ShowNoticeIfNotUsing2PC(); + /* + * It is sufficient to check only one of the tables for metadata syncing on workers, + * since the colocation of two tables implies that either both or none of them have + * metadata on workers. + */ + shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId); + if (shouldSyncMetadata) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + } + taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString); ExecuteModifyTasksWithoutResults(taskList); diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 2ff2f5446..43c3332bf 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -847,6 +847,17 @@ ShardStorageType(Oid relationId) } +/* + * SchemaNode function returns true if this node is identified as the + * schema/coordinator/master node of the cluster. + */ +bool +SchemaNode(void) +{ + return (GetLocalGroupId() == 0); +} + + /* * WorkerNodeGetDatum converts the worker node passed to it into its datum * representation. To do this, the function first creates the heap tuple from diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 4aef00c46..bb1bd75f9 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1377,6 +1377,7 @@ GetLocalGroupId(void) TupleDesc tupleDescriptor = NULL; Oid groupId = InvalidOid; Relation pgDistLocalGroupId = NULL; + Oid localGroupTableOid = InvalidOid; /* * Already set the group id, no need to read the heap again. @@ -1386,7 +1387,13 @@ GetLocalGroupId(void) return LocalGroupId; } - pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock); + localGroupTableOid = get_relname_relid("pg_dist_local_group", PG_CATALOG_NAMESPACE); + if (localGroupTableOid == InvalidOid) + { + return 0; + } + + pgDistLocalGroupId = heap_open(localGroupTableOid, AccessShareLock); scanDescriptor = systable_beginscan(pgDistLocalGroupId, InvalidOid, false, diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 7600093c3..fd56a0e46 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -91,6 +91,8 @@ extern int ShardMaxSize; extern int ShardPlacementPolicy; +extern bool SchemaNode(void); + /* Function declarations local to the distributed module */ extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 0aaafc8cb..07492162b 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -34,6 +34,7 @@ extern char * NodeDeleteCommand(uint32 nodeId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" +#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #endif /* METADATA_SYNC_H */ diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index cd1ef4a6c..4ec98dc1b 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -251,6 +251,7 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': (1 row) -- Make sure that start_metadata_sync_to_node considers foreign key constraints +\c - - - :master_port SET citus.shard_replication_factor TO 1; CREATE SCHEMA mx_testing_schema_2; CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); @@ -268,11 +269,6 @@ SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); (1 row) -UPDATE - pg_dist_partition SET repmodel='s' -WHERE - logicalrelid='mx_testing_schema.fk_test_1'::regclass - OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node @@ -292,7 +288,11 @@ Table "mx_testing_schema_2.fk_test_2" Foreign-key constraints: "fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3) +DROP TABLE mx_testing_schema_2.fk_test_2; +DROP TABLE mx_testing_schema.fk_test_1; \c - - - :master_port +DROP TABLE mx_testing_schema_2.fk_test_2; +DROP TABLE mx_testing_schema.fk_test_1; RESET citus.shard_replication_factor; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port @@ -383,6 +383,62 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; f (1 row) +-- Check that the distributed table can be queried from the worker +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +CREATE TABLE mx_query_test (a int, b text, c int); +SELECT create_distributed_table('mx_query_test', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_query_test'::regclass; + repmodel +---------- + s +(1 row) + +INSERT INTO mx_query_test VALUES (1, 'one', 1); +INSERT INTO mx_query_test VALUES (2, 'two', 4); +INSERT INTO mx_query_test VALUES (3, 'three', 9); +INSERT INTO mx_query_test VALUES (4, 'four', 16); +INSERT INTO mx_query_test VALUES (5, 'five', 24); +\c - - - :worker_1_port +SELECT * FROM mx_query_test ORDER BY a; + a | b | c +---+-------+---- + 1 | one | 1 + 2 | two | 4 + 3 | three | 9 + 4 | four | 16 + 5 | five | 24 +(5 rows) + +INSERT INTO mx_query_test VALUES (6, 'six', 36); +UPDATE mx_query_test SET c = 25 WHERE a = 5; +\c - - - :master_port +SELECT * FROM mx_query_test ORDER BY a; + a | b | c +---+-------+---- + 1 | one | 1 + 2 | two | 4 + 3 | three | 9 + 4 | four | 16 + 5 | five | 25 + 6 | six | 36 +(6 rows) + +\c - - - :worker_1_port +DROP TABLE mx_query_test; +\c - - - :master_port +DROP TABLE mx_query_test; -- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -417,6 +473,7 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) SET citus.shard_count = 5; +SET citus.multi_shard_commit_protocol TO '2pc'; CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_2; -- Create MX tables @@ -468,7 +525,9 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass - OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid; logicalrelid | repmodel -----------------------------+---------- mx_test_schema_1.mx_table_1 | s @@ -487,16 +546,16 @@ ORDER BY logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport -----------------------------+---------+-----------+---------- - mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637 - mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638 - mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637 - mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638 - mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637 - mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637 - mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638 - mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637 - mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638 - mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310104 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310105 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310106 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310107 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310108 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310109 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310110 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310111 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310112 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310113 | localhost | 57637 (10 rows) @@ -552,16 +611,16 @@ ORDER BY logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport -----------------------------+---------+-----------+---------- - mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637 - mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638 - mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637 - mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638 - mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637 - mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637 - mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638 - mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637 - mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638 - mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310104 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310105 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310106 | localhost | 57637 + mx_test_schema_1.mx_table_1 | 1310107 | localhost | 57638 + mx_test_schema_1.mx_table_1 | 1310108 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310109 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310110 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310111 | localhost | 57637 + mx_test_schema_2.mx_table_2 | 1310112 | localhost | 57638 + mx_test_schema_2.mx_table_2 | 1310113 | localhost | 57637 (10 rows) -- Check that metadata of MX tables don't exist on the non-metadata worker @@ -583,10 +642,106 @@ SELECT * FROM pg_dist_shard_placement; ---------+------------+-------------+----------+----------+------------- (0 rows) +-- Check that CREATE INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1); +WARNING: hash indexes are not WAL-logged and their use is discouraged +CREATE UNIQUE INDEX mx_index_4 ON mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_4" UNIQUE, btree (col1) + "mx_index_2" btree (col2) + "mx_index_3" hash (col1) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that DROP INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +DROP INDEX mx_test_schema_2.mx_index_3; +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 +Table "mx_test_schema_2.mx_table_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | +Indexes: + "mx_index_4" UNIQUE, btree (col1) + "mx_index_2" btree (col2) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that ALTER TABLE statements are propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; +ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | + col3 | integer | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Foreign-key constraints: + "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + +-- Check that foreign key constraint with NOT VALID works as well +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint_2 +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1) +NOT VALID; +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 +Table "mx_test_schema_1.mx_table_1" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | text | + col3 | integer | +Indexes: + "mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1) + "mx_index_1" btree (col1) +Foreign-key constraints: + "mx_fk_constraint_2" FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) NOT VALID +Referenced by: + TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) + -- Cleanup \c - - - :worker_1_port -DROP TABLE mx_test_schema_2.mx_table_2; -DROP TABLE mx_test_schema_1.mx_table_1; +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DELETE FROM pg_dist_node; DELETE FROM pg_dist_partition; @@ -606,9 +761,11 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); (1 row) -DROP TABLE mx_test_schema_2.mx_table_2; -DROP TABLE mx_test_schema_1.mx_table_1; +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; RESET citus.shard_count; RESET citus.shard_replication_factor; +RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 386d3c2c2..b9988bf30 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -85,6 +85,7 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; -- Make sure that start_metadata_sync_to_node considers foreign key constraints +\c - - - :master_port SET citus.shard_replication_factor TO 1; CREATE SCHEMA mx_testing_schema_2; @@ -95,19 +96,19 @@ CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); - -UPDATE - pg_dist_partition SET repmodel='s' -WHERE - logicalrelid='mx_testing_schema.fk_test_1'::regclass - OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- Check that foreign key metadata exists on the worker \c - - - :worker_1_port \d mx_testing_schema_2.fk_test_2 + +DROP TABLE mx_testing_schema_2.fk_test_2; +DROP TABLE mx_testing_schema.fk_test_1; \c - - - :master_port +DROP TABLE mx_testing_schema_2.fk_test_2; +DROP TABLE mx_testing_schema.fk_test_1; + RESET citus.shard_replication_factor; @@ -132,6 +133,35 @@ ROLLBACK; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; +-- Check that the distributed table can be queried from the worker +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +CREATE TABLE mx_query_test (a int, b text, c int); +SELECT create_distributed_table('mx_query_test', 'a'); + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_query_test'::regclass; + +INSERT INTO mx_query_test VALUES (1, 'one', 1); +INSERT INTO mx_query_test VALUES (2, 'two', 4); +INSERT INTO mx_query_test VALUES (3, 'three', 9); +INSERT INTO mx_query_test VALUES (4, 'four', 16); +INSERT INTO mx_query_test VALUES (5, 'five', 24); + +\c - - - :worker_1_port +SELECT * FROM mx_query_test ORDER BY a; +INSERT INTO mx_query_test VALUES (6, 'six', 36); +UPDATE mx_query_test SET c = 25 WHERE a = 5; + +\c - - - :master_port +SELECT * FROM mx_query_test ORDER BY a; + +\c - - - :worker_1_port +DROP TABLE mx_query_test; +\c - - - :master_port +DROP TABLE mx_query_test; + -- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -143,6 +173,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; -- Test DDL propagation in MX tables SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SET citus.shard_count = 5; +SET citus.multi_shard_commit_protocol TO '2pc'; CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_2; @@ -168,7 +199,9 @@ FROM pg_dist_partition WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass - OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; + OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass +ORDER BY + logicalrelid; -- See the shards and placements of the mx tables SELECT @@ -218,10 +251,58 @@ SELECT * FROM pg_dist_partition; SELECT * FROM pg_dist_shard; SELECT * FROM pg_dist_shard_placement; +-- Check that CREATE INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1); +CREATE UNIQUE INDEX mx_index_4 ON mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 + +-- Check that DROP INDEX statement is propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +DROP INDEX mx_test_schema_2.mx_index_3; +\c - - - :worker_1_port +\d mx_test_schema_2.mx_table_2 + +-- Check that ALTER TABLE statements are propagated +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; +ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1); +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 + +-- Check that foreign key constraint with NOT VALID works as well +\c - - - :master_port +SET citus.multi_shard_commit_protocol TO '2pc'; +ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint_2 +FOREIGN KEY + (col1) +REFERENCES + mx_test_schema_2.mx_table_2(col1) +NOT VALID; +\c - - - :worker_1_port +\d mx_test_schema_1.mx_table_1 + + -- Cleanup \c - - - :worker_1_port -DROP TABLE mx_test_schema_2.mx_table_2; -DROP TABLE mx_test_schema_1.mx_table_1; +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DELETE FROM pg_dist_node; DELETE FROM pg_dist_partition; @@ -231,11 +312,12 @@ DELETE FROM pg_dist_shard_placement; \c - - - :master_port SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); -DROP TABLE mx_test_schema_2.mx_table_2; -DROP TABLE mx_test_schema_1.mx_table_1; +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; RESET citus.shard_count; RESET citus.shard_replication_factor; +RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;