Make reference table metadata synced to workers

pull/1103/head
Eren Basak 2017-01-05 16:46:11 +03:00
parent e44d226221
commit 23b2619412
7 changed files with 152 additions and 31 deletions

View File

@ -63,11 +63,11 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
/*
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting MX-table queries. The function first sets the localGroupId of the
* worker so that the worker knows which tuple in pg_dist_node table represents itself.
* After that, SQL statetemens for re-creating metadata about mx distributed
* tables are sent to the worker. Finally, the hasmetadata column of the target node in
* pg_dist_node is marked as true.
* worker for accepting queries. The function first sets the localGroupId of the worker
* so that the worker knows which tuple in pg_dist_node table represents itself. After
* that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are
* sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node
* is marked as true.
*/
Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)
@ -132,7 +132,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
/*
* stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
* to false in pg_dist_node table, thus indicating that the specified worker node does not
* receive DDL changes anymore and cannot be used for issuing mx queries.
* receive DDL changes anymore and cannot be used for issuing queries.
*/
Datum
stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
@ -159,19 +159,24 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
/*
* ShouldSyncTableMetadata checks if a distributed table has streaming replication model
* and hash distribution. In that case the distributed table is considered an MX table,
* and its metadata is required to exist on the worker nodes.
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is an MX table or reference table.
* Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
*/
bool
ShouldSyncTableMetadata(Oid relationId)
{
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId);
bool usesHashDistribution = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
bool usesStreamingReplication =
bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
bool streamingReplicated =
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
if (usesStreamingReplication && usesHashDistribution)
bool mxTable = (streamingReplicated && hashDistributed);
bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE);
if (mxTable || referenceTable)
{
return true;
}
@ -199,7 +204,7 @@ MetadataCreateCommands(void)
{
List *metadataSnapshotCommandList = NIL;
List *distributedTableList = DistributedTableList();
List *mxTableList = NIL;
List *propagatedTableList = NIL;
List *workerNodeList = WorkerNodeList();
ListCell *distributedTableCell = NULL;
char *nodeListInsertCommand = NULL;
@ -209,19 +214,19 @@ MetadataCreateCommands(void)
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
nodeListInsertCommand);
/* create the list of mx tables */
/* create the list of tables whose metadata will be created */
foreach(distributedTableCell, distributedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
mxTableList = lappend(mxTableList, cacheEntry);
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* create the mx tables, but not the metadata */
foreach(distributedTableCell, mxTableList)
/* create the tables, but not the metadata */
foreach(distributedTableCell, propagatedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
@ -240,7 +245,7 @@ MetadataCreateCommands(void)
}
/* construct the foreign key constraints after all tables are created */
foreach(distributedTableCell, mxTableList)
foreach(distributedTableCell, propagatedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
@ -253,7 +258,7 @@ MetadataCreateCommands(void)
}
/* after all tables are created, create the metadata */
foreach(distributedTableCell, mxTableList)
foreach(distributedTableCell, propagatedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
@ -323,7 +328,7 @@ GetDistributedTableDDLEvents(Oid relationId)
metadataCommand = DistributionCreateCommand(cacheEntry);
commandList = lappend(commandList, metadataCommand);
/* commands to create the truncate trigger of the mx table */
/* commands to create the truncate trigger of the table */
truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
@ -597,7 +602,6 @@ ShardListInsertCommand(List *shardIntervalList)
appendStringInfo(minHashToken, "NULL");
}
if (shardInterval->maxValueExists)
{
appendStringInfo(maxHashToken, "'%d'", DatumGetInt32(

View File

@ -1221,19 +1221,56 @@ WHERE
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
-- Cleanup
SELECT worker_drop_distributed_table('mx_ref'::regclass);
worker_drop_distributed_table
-------------------------------
(1 row)
-- Check that DDL commands are propagated to reference tables on workers
\c - - - :master_port
ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE INDEX mx_ref_index ON mx_ref(col_1);
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
col_3 | numeric | default 0
Indexes:
"mx_ref_index" btree (col_1)
\c - - - :worker_1_port
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
col_3 | numeric | default 0
Indexes:
"mx_ref_index" btree (col_1)
-- Check that metada is cleaned successfully upon drop table
\c - - - :master_port
DROP TABLE mx_ref;
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
(0 rows)
SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+----------+----------+-------------
(0 rows)
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------

View File

@ -1018,6 +1018,9 @@ ORDER BY s.logicalrelid, sp.shardstate;
reference_failure_test | 1 | 2
(1 row)
-- connect back to the worker and set rename the test_user back
\c - :default_user - :worker_1_port
ALTER USER test_user_new RENAME TO test_user;
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;

View File

@ -28,6 +28,13 @@ SELECT create_distributed_table('mx_table_2', 'col_1');
(1 row)
CREATE TABLE mx_ref_table (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref_table');
create_reference_table
------------------------
(1 row)
-- Check that the created tables are colocated MX tables
SELECT logicalrelid, repmodel, colocationid
FROM pg_dist_partition
@ -46,6 +53,9 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
(1 row)
COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
INSERT INTO mx_ref_table VALUES (-37, 'morbi');
INSERT INTO mx_ref_table VALUES (-78, 'sapien');
INSERT INTO mx_ref_table VALUES (-34, 'augue');
SELECT * FROM mx_table ORDER BY col_1;
col_1 | col_2 | col_3
-------+----------+-------
@ -98,6 +108,35 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass;
5
(1 row)
-- INSERT/UPDATE/DELETE on reference tables
SELECT * FROM mx_ref_table ORDER BY col_1;
col_1 | col_2
-------+--------
-78 | sapien
-37 | morbi
-34 | augue
(3 rows)
INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum');
ERROR: cannot perform distributed planning for the given modification
DETAIL: Modifications to reference tables are supported only from the schema node.
UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Modifications to reference tables are supported only from the schema node.
DELETE FROM mx_ref_table WHERE col_1 = -78;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Modifications to reference tables are supported only from the schema node.
SELECT * FROM mx_ref_table ORDER BY col_1;
col_1 | col_2
-------+--------
-78 | sapien
-37 | morbi
-34 | augue
(3 rows)
\c - - - :master_port
DROP TABLE mx_ref_table;
\c - - - :worker_1_port
-- DDL commands
\d mx_table
Table "public.mx_table"

View File

@ -525,14 +525,30 @@ WHERE
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
-- Cleanup
SELECT worker_drop_distributed_table('mx_ref'::regclass);
-- Check that DDL commands are propagated to reference tables on workers
\c - - - :master_port
ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0;
CREATE INDEX mx_ref_index ON mx_ref(col_1);
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
-- Check that metada is cleaned successfully upon drop table
\c - - - :master_port
DROP TABLE mx_ref;
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);

View File

@ -758,6 +758,10 @@ AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- connect back to the worker and set rename the test_user back
\c - :default_user - :worker_1_port
ALTER USER test_user_new RENAME TO test_user;
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;

View File

@ -25,6 +25,9 @@ SELECT create_distributed_table('mx_table', 'col_1');
CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL);
SELECT create_distributed_table('mx_table_2', 'col_1');
CREATE TABLE mx_ref_table (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref_table');
-- Check that the created tables are colocated MX tables
SELECT logicalrelid, repmodel, colocationid
FROM pg_dist_partition
@ -41,6 +44,10 @@ COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
65832, 'amet'
\.
INSERT INTO mx_ref_table VALUES (-37, 'morbi');
INSERT INTO mx_ref_table VALUES (-78, 'sapien');
INSERT INTO mx_ref_table VALUES (-34, 'augue');
SELECT * FROM mx_table ORDER BY col_1;
-- Try commands from metadata worker
@ -73,6 +80,17 @@ INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp;
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass;
-- INSERT/UPDATE/DELETE on reference tables
SELECT * FROM mx_ref_table ORDER BY col_1;
INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum');
UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37;
DELETE FROM mx_ref_table WHERE col_1 = -78;
SELECT * FROM mx_ref_table ORDER BY col_1;
\c - - - :master_port
DROP TABLE mx_ref_table;
\c - - - :worker_1_port
-- DDL commands
\d mx_table
CREATE INDEX mx_test_index ON mx_table(col_1);