Propagate DDL commands to metadata workers for MX tables

pull/1045/head
Eren Basak 2016-12-18 13:57:19 +03:00
parent 048fddf4da
commit 71d73ec5ff
8 changed files with 415 additions and 47 deletions

View File

@ -180,9 +180,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
List *commandList = GetDistributedTableDDLEvents(relationId); List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
/* disable DDL propagation on workers */ SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA,
"SET citus.enable_ddl_propagation TO off");
/* send the commands one by one */ /* send the commands one by one */
foreach(commandCell, commandList) foreach(commandCell, commandList)

View File

@ -37,6 +37,7 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
@ -49,6 +50,7 @@
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/transmit.h" #include "distributed/transmit.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -107,6 +109,8 @@ static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
const char *dropIndexCommand, bool isTopLevel); const char *dropIndexCommand, bool isTopLevel);
static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand, bool isTopLevel); const char *alterTableCommand, bool isTopLevel);
static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand);
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand, const char *alterObjectSchemaCommand,
bool isTopLevel); bool isTopLevel);
@ -147,6 +151,7 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
static bool warnedUserAbout2PC = false; static bool warnedUserAbout2PC = false;
/* /*
* Utility for handling citus specific concerns around utility statements. * Utility for handling citus specific concerns around utility statements.
* *
@ -167,6 +172,8 @@ multi_ProcessUtility(Node *parsetree,
DestReceiver *dest, DestReceiver *dest,
char *completionTag) char *completionTag)
{ {
bool schemaNode = SchemaNode();
bool propagateChanges = schemaNode && EnableDDLPropagation;
bool commandMustRunAsOwner = false; bool commandMustRunAsOwner = false;
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
@ -222,8 +229,11 @@ multi_ProcessUtility(Node *parsetree,
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
} }
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */ /*
if (EnableDDLPropagation) * DDL commands are propagated to workers only if EnableDDLPropagation is
* set to true and the current node is the schema node
*/
if (propagateChanges)
{ {
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
@ -289,6 +299,24 @@ multi_ProcessUtility(Node *parsetree,
"move all tables."))); "move all tables.")));
} }
} }
else if (!schemaNode)
{
if (IsA(parsetree, AlterTableStmt))
{
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
if (alterTableStmt->relkind == OBJECT_TABLE)
{
/*
* When the schema node issues an ALTER TABLE ... ADD FOREIGN KEY
* command, the validation step should be skipped on the distributed
* table of the worker. Therefore, we check whether the given ALTER
* TABLE statement is a FOREIGN KEY constraint and if so disable the
* validation step. Note that validation is done on the shard level.
*/
parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString);
}
}
}
/* /*
* Inform the user about potential caveats. * Inform the user about potential caveats.
@ -858,6 +886,68 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
} }
/*
* WorkerProcessAlterTableStmt checks and processes the alter table statement to be
* worked on the distributed table of the worker node. Currently, it only processes
* ALTER TABLE ... ADD FOREIGN KEY command to skip the validation step.
*/
static Node *
WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand)
{
LOCKMODE lockmode = 0;
Oid leftRelationId = InvalidOid;
bool isDistributedRelation = false;
List *commandList = NIL;
ListCell *commandCell = NULL;
/* first check whether a distributed relation is affected */
if (alterTableStatement->relation == NULL)
{
return (Node *) alterTableStatement;
}
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
if (!OidIsValid(leftRelationId))
{
return (Node *) alterTableStatement;
}
isDistributedRelation = IsDistributedTable(leftRelationId);
if (!isDistributedRelation)
{
return (Node *) alterTableStatement;
}
/*
* We check if there is a ADD FOREIGN CONSTRAINT command in sub commands list.
* If there is we assign referenced releation id to rightRelationId and we also
* set skip_validation to true to prevent PostgreSQL to verify validity of the
* foreign constraint in master. Validity will be checked in workers anyway.
*/
commandList = alterTableStatement->cmds;
foreach(commandCell, commandList)
{
AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell);
AlterTableType alterTableType = command->subtype;
if (alterTableType == AT_AddConstraint)
{
Constraint *constraint = (Constraint *) command->def;
if (constraint->contype == CONSTR_FOREIGN)
{
/* foreign constraint validations will be done in shards. */
constraint->skip_validation = true;
}
}
}
return (Node *) alterTableStatement;
}
/* /*
* ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed * ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed
* objects. The function first checks if the statement belongs to a distributed objects * objects. The function first checks if the statement belongs to a distributed objects
@ -1866,6 +1956,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
bool isTopLevel) bool isTopLevel)
{ {
List *taskList = NIL; List *taskList = NIL;
bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
if (XactModificationLevel == XACT_MODIFICATION_DATA) if (XactModificationLevel == XACT_MODIFICATION_DATA)
{ {
@ -1877,6 +1968,12 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
ShowNoticeIfNotUsing2PC(); ShowNoticeIfNotUsing2PC();
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString);
}
taskList = DDLTaskList(relationId, ddlCommandString); taskList = DDLTaskList(relationId, ddlCommandString);
ExecuteModifyTasksWithoutResults(taskList); ExecuteModifyTasksWithoutResults(taskList);
@ -1900,6 +1997,7 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId,
const char *ddlCommandString, bool isTopLevel) const char *ddlCommandString, bool isTopLevel)
{ {
List *taskList = NIL; List *taskList = NIL;
bool shouldSyncMetadata = false;
if (XactModificationLevel == XACT_MODIFICATION_DATA) if (XactModificationLevel == XACT_MODIFICATION_DATA)
{ {
@ -1911,6 +2009,18 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId,
ShowNoticeIfNotUsing2PC(); ShowNoticeIfNotUsing2PC();
/*
* It is sufficient to check only one of the tables for metadata syncing on workers,
* since the colocation of two tables implies that either both or none of them have
* metadata on workers.
*/
shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId);
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString);
}
taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString); taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString);
ExecuteModifyTasksWithoutResults(taskList); ExecuteModifyTasksWithoutResults(taskList);

View File

@ -847,6 +847,17 @@ ShardStorageType(Oid relationId)
} }
/*
* SchemaNode function returns true if this node is identified as the
* schema/coordinator/master node of the cluster.
*/
bool
SchemaNode(void)
{
return (GetLocalGroupId() == 0);
}
/* /*
* WorkerNodeGetDatum converts the worker node passed to it into its datum * WorkerNodeGetDatum converts the worker node passed to it into its datum
* representation. To do this, the function first creates the heap tuple from * representation. To do this, the function first creates the heap tuple from

View File

@ -1377,6 +1377,7 @@ GetLocalGroupId(void)
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Oid groupId = InvalidOid; Oid groupId = InvalidOid;
Relation pgDistLocalGroupId = NULL; Relation pgDistLocalGroupId = NULL;
Oid localGroupTableOid = InvalidOid;
/* /*
* Already set the group id, no need to read the heap again. * Already set the group id, no need to read the heap again.
@ -1386,7 +1387,13 @@ GetLocalGroupId(void)
return LocalGroupId; return LocalGroupId;
} }
pgDistLocalGroupId = heap_open(DistLocalGroupIdRelationId(), AccessShareLock); localGroupTableOid = get_relname_relid("pg_dist_local_group", PG_CATALOG_NAMESPACE);
if (localGroupTableOid == InvalidOid)
{
return 0;
}
pgDistLocalGroupId = heap_open(localGroupTableOid, AccessShareLock);
scanDescriptor = systable_beginscan(pgDistLocalGroupId, scanDescriptor = systable_beginscan(pgDistLocalGroupId,
InvalidOid, false, InvalidOid, false,

View File

@ -91,6 +91,8 @@ extern int ShardMaxSize;
extern int ShardPlacementPolicy; extern int ShardPlacementPolicy;
extern bool SchemaNode(void);
/* Function declarations local to the distributed module */ /* Function declarations local to the distributed module */
extern bool CStoreTable(Oid relationId); extern bool CStoreTable(Oid relationId);
extern uint64 GetNextShardId(void); extern uint64 GetNextShardId(void);

View File

@ -34,6 +34,7 @@ extern char * NodeDeleteCommand(uint32 nodeId);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node" #define DELETE_ALL_NODES "TRUNCATE pg_dist_node"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#endif /* METADATA_SYNC_H */ #endif /* METADATA_SYNC_H */

View File

@ -251,6 +251,7 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table':
(1 row) (1 row)
-- Make sure that start_metadata_sync_to_node considers foreign key constraints -- Make sure that start_metadata_sync_to_node considers foreign key constraints
\c - - - :master_port
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE SCHEMA mx_testing_schema_2; CREATE SCHEMA mx_testing_schema_2;
CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3));
@ -268,11 +269,6 @@ SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1');
(1 row) (1 row)
UPDATE
pg_dist_partition SET repmodel='s'
WHERE
logicalrelid='mx_testing_schema.fk_test_1'::regclass
OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node start_metadata_sync_to_node
@ -292,7 +288,11 @@ Table "mx_testing_schema_2.fk_test_2"
Foreign-key constraints: Foreign-key constraints:
"fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3) "fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3)
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
-- Check that repeated calls to start_metadata_sync_to_node has no side effects -- Check that repeated calls to start_metadata_sync_to_node has no side effects
\c - - - :master_port \c - - - :master_port
@ -383,6 +383,62 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
f f
(1 row) (1 row)
-- Check that the distributed table can be queried from the worker
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
CREATE TABLE mx_query_test (a int, b text, c int);
SELECT create_distributed_table('mx_query_test', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_query_test'::regclass;
repmodel
----------
s
(1 row)
INSERT INTO mx_query_test VALUES (1, 'one', 1);
INSERT INTO mx_query_test VALUES (2, 'two', 4);
INSERT INTO mx_query_test VALUES (3, 'three', 9);
INSERT INTO mx_query_test VALUES (4, 'four', 16);
INSERT INTO mx_query_test VALUES (5, 'five', 24);
\c - - - :worker_1_port
SELECT * FROM mx_query_test ORDER BY a;
a | b | c
---+-------+----
1 | one | 1
2 | two | 4
3 | three | 9
4 | four | 16
5 | five | 24
(5 rows)
INSERT INTO mx_query_test VALUES (6, 'six', 36);
UPDATE mx_query_test SET c = 25 WHERE a = 5;
\c - - - :master_port
SELECT * FROM mx_query_test ORDER BY a;
a | b | c
---+-------+----
1 | one | 1
2 | two | 4
3 | three | 9
4 | four | 16
5 | five | 25
6 | six | 36
(6 rows)
\c - - - :worker_1_port
DROP TABLE mx_query_test;
\c - - - :master_port
DROP TABLE mx_query_test;
-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false -- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false
\c - - - :master_port \c - - - :master_port
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
@ -417,6 +473,7 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
(1 row) (1 row)
SET citus.shard_count = 5; SET citus.shard_count = 5;
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_1;
CREATE SCHEMA mx_test_schema_2; CREATE SCHEMA mx_test_schema_2;
-- Create MX tables -- Create MX tables
@ -468,7 +525,9 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY
logicalrelid;
logicalrelid | repmodel logicalrelid | repmodel
-----------------------------+---------- -----------------------------+----------
mx_test_schema_1.mx_table_1 | s mx_test_schema_1.mx_table_1 | s
@ -487,16 +546,16 @@ ORDER BY
logicalrelid, shardid; logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport logicalrelid | shardid | nodename | nodeport
-----------------------------+---------+-----------+---------- -----------------------------+---------+-----------+----------
mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310104 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638 mx_test_schema_1.mx_table_1 | 1310105 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310106 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638 mx_test_schema_1.mx_table_1 | 1310107 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310108 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637 mx_test_schema_2.mx_table_2 | 1310109 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638 mx_test_schema_2.mx_table_2 | 1310110 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637 mx_test_schema_2.mx_table_2 | 1310111 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638 mx_test_schema_2.mx_table_2 | 1310112 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637 mx_test_schema_2.mx_table_2 | 1310113 | localhost | 57637
(10 rows) (10 rows)
@ -552,16 +611,16 @@ ORDER BY
logicalrelid, shardid; logicalrelid, shardid;
logicalrelid | shardid | nodename | nodeport logicalrelid | shardid | nodename | nodeport
-----------------------------+---------+-----------+---------- -----------------------------+---------+-----------+----------
mx_test_schema_1.mx_table_1 | 1310008 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310104 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310009 | localhost | 57638 mx_test_schema_1.mx_table_1 | 1310105 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310010 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310106 | localhost | 57637
mx_test_schema_1.mx_table_1 | 1310011 | localhost | 57638 mx_test_schema_1.mx_table_1 | 1310107 | localhost | 57638
mx_test_schema_1.mx_table_1 | 1310012 | localhost | 57637 mx_test_schema_1.mx_table_1 | 1310108 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310013 | localhost | 57637 mx_test_schema_2.mx_table_2 | 1310109 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310014 | localhost | 57638 mx_test_schema_2.mx_table_2 | 1310110 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310015 | localhost | 57637 mx_test_schema_2.mx_table_2 | 1310111 | localhost | 57637
mx_test_schema_2.mx_table_2 | 1310016 | localhost | 57638 mx_test_schema_2.mx_table_2 | 1310112 | localhost | 57638
mx_test_schema_2.mx_table_2 | 1310017 | localhost | 57637 mx_test_schema_2.mx_table_2 | 1310113 | localhost | 57637
(10 rows) (10 rows)
-- Check that metadata of MX tables don't exist on the non-metadata worker -- Check that metadata of MX tables don't exist on the non-metadata worker
@ -583,10 +642,106 @@ SELECT * FROM pg_dist_shard_placement;
---------+------------+-------------+----------+----------+------------- ---------+------------+-------------+----------+----------+-------------
(0 rows) (0 rows)
-- Check that CREATE INDEX statement is propagated
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1);
WARNING: hash indexes are not WAL-logged and their use is discouraged
CREATE UNIQUE INDEX mx_index_4 ON mx_test_schema_2.mx_table_2(col1);
\c - - - :worker_1_port
\d mx_test_schema_2.mx_table_2
Table "mx_test_schema_2.mx_table_2"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
Indexes:
"mx_index_4" UNIQUE, btree (col1)
"mx_index_2" btree (col2)
"mx_index_3" hash (col1)
Foreign-key constraints:
"mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
-- Check that DROP INDEX statement is propagated
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
DROP INDEX mx_test_schema_2.mx_index_3;
\c - - - :worker_1_port
\d mx_test_schema_2.mx_table_2
Table "mx_test_schema_2.mx_table_2"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
Indexes:
"mx_index_4" UNIQUE, btree (col1)
"mx_index_2" btree (col2)
Foreign-key constraints:
"mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
-- Check that ALTER TABLE statements are propagated
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC;
ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT;
ALTER TABLE
mx_test_schema_1.mx_table_1
ADD CONSTRAINT
mx_fk_constraint
FOREIGN KEY
(col1)
REFERENCES
mx_test_schema_2.mx_table_2(col1);
\c - - - :worker_1_port
\d mx_test_schema_1.mx_table_1
Table "mx_test_schema_1.mx_table_1"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
col3 | integer |
Indexes:
"mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1)
"mx_index_1" btree (col1)
Foreign-key constraints:
"mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1)
Referenced by:
TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
-- Check that foreign key constraint with NOT VALID works as well
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint;
ALTER TABLE
mx_test_schema_1.mx_table_1
ADD CONSTRAINT
mx_fk_constraint_2
FOREIGN KEY
(col1)
REFERENCES
mx_test_schema_2.mx_table_2(col1)
NOT VALID;
\c - - - :worker_1_port
\d mx_test_schema_1.mx_table_1
Table "mx_test_schema_1.mx_table_1"
Column | Type | Modifiers
--------+---------+-----------
col1 | integer |
col2 | text |
col3 | integer |
Indexes:
"mx_table_1_col1_key" UNIQUE CONSTRAINT, btree (col1)
"mx_index_1" btree (col1)
Foreign-key constraints:
"mx_fk_constraint_2" FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) NOT VALID
Referenced by:
TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
-- Cleanup -- Cleanup
\c - - - :worker_1_port \c - - - :worker_1_port
DROP TABLE mx_test_schema_2.mx_table_2; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1; NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DELETE FROM pg_dist_node; DELETE FROM pg_dist_node;
DELETE FROM pg_dist_partition; DELETE FROM pg_dist_partition;
@ -606,9 +761,11 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row) (1 row)
DROP TABLE mx_test_schema_2.mx_table_2; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1; NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count; RESET citus.shard_count;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;

View File

@ -85,6 +85,7 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid;
SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass;
-- Make sure that start_metadata_sync_to_node considers foreign key constraints -- Make sure that start_metadata_sync_to_node considers foreign key constraints
\c - - - :master_port
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE SCHEMA mx_testing_schema_2; CREATE SCHEMA mx_testing_schema_2;
@ -95,19 +96,19 @@ CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text,
SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1');
SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1');
UPDATE
pg_dist_partition SET repmodel='s'
WHERE
logicalrelid='mx_testing_schema.fk_test_1'::regclass
OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- Check that foreign key metadata exists on the worker -- Check that foreign key metadata exists on the worker
\c - - - :worker_1_port \c - - - :worker_1_port
\d mx_testing_schema_2.fk_test_2 \d mx_testing_schema_2.fk_test_2
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_testing_schema_2.fk_test_2;
DROP TABLE mx_testing_schema.fk_test_1;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
@ -132,6 +133,35 @@ ROLLBACK;
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
-- Check that the distributed table can be queried from the worker
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
CREATE TABLE mx_query_test (a int, b text, c int);
SELECT create_distributed_table('mx_query_test', 'a');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_query_test'::regclass;
INSERT INTO mx_query_test VALUES (1, 'one', 1);
INSERT INTO mx_query_test VALUES (2, 'two', 4);
INSERT INTO mx_query_test VALUES (3, 'three', 9);
INSERT INTO mx_query_test VALUES (4, 'four', 16);
INSERT INTO mx_query_test VALUES (5, 'five', 24);
\c - - - :worker_1_port
SELECT * FROM mx_query_test ORDER BY a;
INSERT INTO mx_query_test VALUES (6, 'six', 36);
UPDATE mx_query_test SET c = 25 WHERE a = 5;
\c - - - :master_port
SELECT * FROM mx_query_test ORDER BY a;
\c - - - :worker_1_port
DROP TABLE mx_query_test;
\c - - - :master_port
DROP TABLE mx_query_test;
-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false -- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false
\c - - - :master_port \c - - - :master_port
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
@ -143,6 +173,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
-- Test DDL propagation in MX tables -- Test DDL propagation in MX tables
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET citus.shard_count = 5; SET citus.shard_count = 5;
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_1;
CREATE SCHEMA mx_test_schema_2; CREATE SCHEMA mx_test_schema_2;
@ -168,7 +199,9 @@ FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass
OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass
ORDER BY
logicalrelid;
-- See the shards and placements of the mx tables -- See the shards and placements of the mx tables
SELECT SELECT
@ -218,10 +251,58 @@ SELECT * FROM pg_dist_partition;
SELECT * FROM pg_dist_shard; SELECT * FROM pg_dist_shard;
SELECT * FROM pg_dist_shard_placement; SELECT * FROM pg_dist_shard_placement;
-- Check that CREATE INDEX statement is propagated
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1);
CREATE UNIQUE INDEX mx_index_4 ON mx_test_schema_2.mx_table_2(col1);
\c - - - :worker_1_port
\d mx_test_schema_2.mx_table_2
-- Check that DROP INDEX statement is propagated
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
DROP INDEX mx_test_schema_2.mx_index_3;
\c - - - :worker_1_port
\d mx_test_schema_2.mx_table_2
-- Check that ALTER TABLE statements are propagated
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC;
ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT;
ALTER TABLE
mx_test_schema_1.mx_table_1
ADD CONSTRAINT
mx_fk_constraint
FOREIGN KEY
(col1)
REFERENCES
mx_test_schema_2.mx_table_2(col1);
\c - - - :worker_1_port
\d mx_test_schema_1.mx_table_1
-- Check that foreign key constraint with NOT VALID works as well
\c - - - :master_port
SET citus.multi_shard_commit_protocol TO '2pc';
ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint;
ALTER TABLE
mx_test_schema_1.mx_table_1
ADD CONSTRAINT
mx_fk_constraint_2
FOREIGN KEY
(col1)
REFERENCES
mx_test_schema_2.mx_table_2(col1)
NOT VALID;
\c - - - :worker_1_port
\d mx_test_schema_1.mx_table_1
-- Cleanup -- Cleanup
\c - - - :worker_1_port \c - - - :worker_1_port
DROP TABLE mx_test_schema_2.mx_table_2; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DELETE FROM pg_dist_node; DELETE FROM pg_dist_node;
DELETE FROM pg_dist_partition; DELETE FROM pg_dist_partition;
@ -231,11 +312,12 @@ DELETE FROM pg_dist_shard_placement;
\c - - - :master_port \c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
DROP TABLE mx_test_schema_2.mx_table_2; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count; RESET citus.shard_count;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;