From 9eff968d1f4a64f6de762742b07eb2d388c3c158 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Tue, 8 Nov 2016 16:35:41 -0800 Subject: [PATCH 1/7] Add start_metadata_sync_to_node UDF This change adds `start_metadata_sync_to_node` UDF which copies the metadata about nodes and MX tables from master to the specified worker, sets its local group ID and marks its hasmetadata to true to allow it receive future DDL changes. --- src/backend/distributed/Makefile | 5 +- .../distributed/citus--6.0-18--6.1-1.sql | 12 ++ src/backend/distributed/citus.control | 2 +- .../distributed/metadata/metadata_sync.c | 152 ++++++++++++++++++ src/include/distributed/metadata_cache.h | 1 - src/test/regress/expected/multi_extension.out | 1 + src/test/regress/sql/multi_extension.sql | 1 + 7 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-18--6.1-1.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index ee5b115ef..9b4ace629 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,8 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ + 6.1-1 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -94,6 +95,8 @@ $(EXTENSION)--6.0-17.sql: $(EXTENSION)--6.0-16.sql $(EXTENSION)--6.0-16--6.0-17. cat $^ > $@ $(EXTENSION)--6.0-18.sql: $(EXTENSION)--6.0-17.sql $(EXTENSION)--6.0-17--6.0-18.sql cat $^ > $@ +$(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-18--6.1-1.sql b/src/backend/distributed/citus--6.0-18--6.1-1.sql new file mode 100644 index 000000000..44660e753 --- /dev/null +++ b/src/backend/distributed/citus--6.0-18--6.1-1.sql @@ -0,0 +1,12 @@ +/* citus--6.0-18--6.1-1.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$start_metadata_sync_to_node$$; +COMMENT ON FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer) + IS 'sync metadata to node'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 63d1859f1..74ed4aa9e 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-18' +default_version = '6.1-1' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 11f794e0c..7f5935b78 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -17,8 +17,12 @@ #include #include +#include "access/genam.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" #include "catalog/dependency.h" +#include "catalog/indexing.h" #include "catalog/pg_foreign_server.h" #include "distributed/citus_ruleutils.h" #include "distributed/distribution_column.h" @@ -27,14 +31,89 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" +#include "distributed/pg_dist_node.h" #include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" #include "foreign/foreign.h" #include "nodes/pg_list.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +static char * LocalGroupIdUpdateCommand(uint32 groupId); +static void MarkNodeHasMetadata(char *nodeName, int32 nodePort); + + +PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); + + +/* + * start_metadata_sync_to_node function creates the metadata in a worker for preparing the + * worker for accepting MX-table queries. The function first sets the localGroupId of the + * worker so that the worker knows which tuple in pg_dist_node table represents itself. + * After that, SQL statetemens for re-creating metadata about mx distributed + * tables are sent to the worker. Finally, the hasmetadata column of the target node in + * pg_dist_node is marked as true. + */ +Datum +start_metadata_sync_to_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + char *extensionOwner = CitusExtensionOwnerName(); + + WorkerNode *workerNode = NULL; + char *localGroupIdUpdateCommand = NULL; + List *recreateMetadataSnapshotCommandList = NIL; + List *dropMetadataCommandList = NIL; + List *createMetadataCommandList = NIL; + + EnsureSuperUser(); + + workerNode = FindWorkerNode(nodeNameString, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("you cannot sync metadata to a non-existent node"), + errhint("First, add the node with SELECT master_add_node(%s,%d)", + nodeNameString, nodePort))); + } + + /* generate and add the local group id's update query */ + localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); + + /* generate the queries which drop the metadata */ + dropMetadataCommandList = MetadataDropCommands(); + + /* generate the queries which create the metadata from scratch */ + createMetadataCommandList = MetadataCreateCommands(); + + recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList, + localGroupIdUpdateCommand); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + dropMetadataCommandList); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + createMetadataCommandList); + + /* + * Send the snapshot recreation commands in a single remote transaction and + * error out in any kind of failure. Note that it is not required to send + * createMetadataSnapshotCommandList in the same transaction that we send + * nodeDeleteCommand and nodeInsertCommand commands below. + */ + SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner, + recreateMetadataSnapshotCommandList); + + MarkNodeHasMetadata(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + /* * ShouldSyncTableMetadata checks if a distributed table has streaming replication model * and hash distribution. In that case the distributed table is considered an MX table, @@ -416,3 +495,76 @@ GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry) return commandList; } + + +/* + * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id + * of a worker and returns the command in a string. + */ +static char * +LocalGroupIdUpdateCommand(uint32 groupId) +{ + StringInfo updateCommand = makeStringInfo(); + + appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d", + groupId); + + return updateCommand->data; +} + + +/* + * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in + * pg_dist_node to true. + */ +static void +MarkNodeHasMetadata(char *nodeName, int32 nodePort) +{ + const bool indexOK = false; + const int scanKeyCount = 2; + + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + ScanKeyData scanKey[scanKeyCount]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_node]; + bool isnull[Natts_pg_dist_node]; + bool replace[Natts_pg_dist_node]; + + pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistNode); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, + BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort)); + + scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(true); + isnull[Anum_pg_dist_node_hasmetadata - 1] = false; + replace[Anum_pg_dist_node_hasmetadata - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistNode, heapTuple); + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, NoLock); +} diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index a1c946eb7..4b8753694 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -59,7 +59,6 @@ extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationI extern int GetLocalGroupId(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); -extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 16940c745..740b40318 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15'; ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; +ALTER EXTENSION citus UPDATE TO '6.1-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index c375969df..4caaaaba8 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15'; ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; +ALTER EXTENSION citus UPDATE TO '6.1-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) From c154a9162197b72cb3c17354f44f8af4f5f8b785 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Mon, 28 Nov 2016 17:38:08 +0300 Subject: [PATCH 2/7] Add Regression Tests For start_metadata_sync_to_node --- .../expected/multi_metadata_snapshot.out | 167 ++++++++++++++++++ .../regress/sql/multi_metadata_snapshot.sql | 46 +++++ 2 files changed, 213 insertions(+) diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index c33daeadf..d874a0c18 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -152,4 +152,171 @@ SELECT unnest(master_metadata_snapshot()); INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') (12 rows) +-- Test start_metadata_sync_to_node UDF +-- Ensure that hasmetadata=false for all nodes +SELECT count(*) FROM pg_dist_node WHERE hasmetadata=true; + count +------- + 0 +(1 row) + +-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------- + +(1 row) + +SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port; + nodeid | hasmetadata +--------+------------- + 1 | t +(1 row) + +-- Check that the metadata has been copied to the worker +\c - - - :worker_1_port +SELECT * FROM pg_dist_local_group; + groupid +--------- + 1 +(1 row) + +SELECT * FROM pg_dist_node ORDER BY nodeid; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +--------+---------+-----------+----------+----------+------------- + 1 | 1 | localhost | 57637 | default | f + 2 | 2 | localhost | 57638 | default | f +(2 rows) + +SELECT * FROM pg_dist_partition ORDER BY logicalrelid; + logicalrelid | partmethod | partkey | colocationid | repmodel +---------------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- + mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | s +(1 row) + +SELECT * FROM pg_dist_shard ORDER BY shardid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +---------------------------------+---------+--------------+---------------+--------------- + mx_testing_schema.mx_test_table | 1310000 | t | -2147483648 | -1610612737 + mx_testing_schema.mx_test_table | 1310001 | t | -1610612736 | -1073741825 + mx_testing_schema.mx_test_table | 1310002 | t | -1073741824 | -536870913 + mx_testing_schema.mx_test_table | 1310003 | t | -536870912 | -1 + mx_testing_schema.mx_test_table | 1310004 | t | 0 | 536870911 + mx_testing_schema.mx_test_table | 1310005 | t | 536870912 | 1073741823 + mx_testing_schema.mx_test_table | 1310006 | t | 1073741824 | 1610612735 + mx_testing_schema.mx_test_table | 1310007 | t | 1610612736 | 2147483647 +(8 rows) + +SELECT * FROM pg_dist_shard_placement ORDER BY shardid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1310000 | 1 | 0 | localhost | 57637 | 100000 + 1310001 | 1 | 0 | localhost | 57638 | 100001 + 1310002 | 1 | 0 | localhost | 57637 | 100002 + 1310003 | 1 | 0 | localhost | 57638 | 100003 + 1310004 | 1 | 0 | localhost | 57637 | 100004 + 1310005 | 1 | 0 | localhost | 57638 | 100005 + 1310006 | 1 | 0 | localhost | 57637 | 100006 + 1310007 | 1 | 0 | localhost | 57638 | 100007 +(8 rows) + +\d mx_testing_schema.mx_test_table + Table "mx_testing_schema.mx_test_table" + Column | Type | Modifiers +--------+---------+--------------------------------------------------------------------------------- + col_1 | integer | + col_2 | text | not null + col_3 | integer | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) +Indexes: + "mx_test_table_col_1_key" UNIQUE CONSTRAINT, btree (col_1) + "mx_index" btree (col_2) + +-- Check that pg_dist_colocation is not synced +SELECT * FROM pg_dist_colocation ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +-- Check that repeated calls to sync_metadata has no side effects +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT * FROM pg_dist_local_group; + groupid +--------- + 1 +(1 row) + +SELECT * FROM pg_dist_node ORDER BY nodeid; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +--------+---------+-----------+----------+----------+------------- + 1 | 1 | localhost | 57637 | default | t + 2 | 2 | localhost | 57638 | default | f +(2 rows) + +SELECT * FROM pg_dist_partition ORDER BY logicalrelid; + logicalrelid | partmethod | partkey | colocationid | repmodel +---------------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- + mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | s +(1 row) + +SELECT * FROM pg_dist_shard ORDER BY shardid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +---------------------------------+---------+--------------+---------------+--------------- + mx_testing_schema.mx_test_table | 1310000 | t | -2147483648 | -1610612737 + mx_testing_schema.mx_test_table | 1310001 | t | -1610612736 | -1073741825 + mx_testing_schema.mx_test_table | 1310002 | t | -1073741824 | -536870913 + mx_testing_schema.mx_test_table | 1310003 | t | -536870912 | -1 + mx_testing_schema.mx_test_table | 1310004 | t | 0 | 536870911 + mx_testing_schema.mx_test_table | 1310005 | t | 536870912 | 1073741823 + mx_testing_schema.mx_test_table | 1310006 | t | 1073741824 | 1610612735 + mx_testing_schema.mx_test_table | 1310007 | t | 1610612736 | 2147483647 +(8 rows) + +SELECT * FROM pg_dist_shard_placement ORDER BY shardid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1310000 | 1 | 0 | localhost | 57637 | 100000 + 1310001 | 1 | 0 | localhost | 57638 | 100001 + 1310002 | 1 | 0 | localhost | 57637 | 100002 + 1310003 | 1 | 0 | localhost | 57638 | 100003 + 1310004 | 1 | 0 | localhost | 57637 | 100004 + 1310005 | 1 | 0 | localhost | 57638 | 100005 + 1310006 | 1 | 0 | localhost | 57637 | 100006 + 1310007 | 1 | 0 | localhost | 57638 | 100007 +(8 rows) + +\d mx_testing_schema.mx_test_table + Table "mx_testing_schema.mx_test_table" + Column | Type | Modifiers +--------+---------+--------------------------------------------------------------------------------- + col_1 | integer | + col_2 | text | not null + col_3 | integer | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) +Indexes: + "mx_test_table_col_1_key" UNIQUE CONSTRAINT, btree (col_1) + "mx_index" btree (col_2) + +-- Cleanup +\c - - - :worker_1_port +DROP TABLE mx_testing_schema.mx_test_table; +DELETE FROM pg_dist_node; +DELETE FROM pg_dist_partition; +DELETE FROM pg_dist_shard; +DELETE FROM pg_dist_shard_placement; +\d mx_testing_schema.mx_test_table +\c - - - :master_port +DROP TABLE mx_testing_schema.mx_test_table; +UPDATE pg_dist_node SET hasmetadata=false; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_metadata_snapshot.sql b/src/test/regress/sql/multi_metadata_snapshot.sql index 8a08a4ebb..f428b8400 100644 --- a/src/test/regress/sql/multi_metadata_snapshot.sql +++ b/src/test/regress/sql/multi_metadata_snapshot.sql @@ -59,4 +59,50 @@ SELECT unnest(master_metadata_snapshot()); UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; SELECT unnest(master_metadata_snapshot()); +-- Test start_metadata_sync_to_node UDF + +-- Ensure that hasmetadata=false for all nodes +SELECT count(*) FROM pg_dist_node WHERE hasmetadata=true; + +-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port; + +-- Check that the metadata has been copied to the worker +\c - - - :worker_1_port +SELECT * FROM pg_dist_local_group; +SELECT * FROM pg_dist_node ORDER BY nodeid; +SELECT * FROM pg_dist_partition ORDER BY logicalrelid; +SELECT * FROM pg_dist_shard ORDER BY shardid; +SELECT * FROM pg_dist_shard_placement ORDER BY shardid; +\d mx_testing_schema.mx_test_table + +-- Check that pg_dist_colocation is not synced +SELECT * FROM pg_dist_colocation ORDER BY colocationid; + +-- Check that repeated calls to sync_metadata has no side effects +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +\c - - - :worker_1_port +SELECT * FROM pg_dist_local_group; +SELECT * FROM pg_dist_node ORDER BY nodeid; +SELECT * FROM pg_dist_partition ORDER BY logicalrelid; +SELECT * FROM pg_dist_shard ORDER BY shardid; +SELECT * FROM pg_dist_shard_placement ORDER BY shardid; +\d mx_testing_schema.mx_test_table + +-- Cleanup +\c - - - :worker_1_port +DROP TABLE mx_testing_schema.mx_test_table; +DELETE FROM pg_dist_node; +DELETE FROM pg_dist_partition; +DELETE FROM pg_dist_shard; +DELETE FROM pg_dist_shard_placement; +\d mx_testing_schema.mx_test_table + +\c - - - :master_port +DROP TABLE mx_testing_schema.mx_test_table; +UPDATE pg_dist_node SET hasmetadata=false; + ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; From 4fd086f0afcafad9b568ae1a2cab292f698f0838 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Fri, 2 Dec 2016 10:39:11 +0300 Subject: [PATCH 3/7] Prevent Transactions in start_metadata_sync_to_node --- src/backend/distributed/metadata/metadata_sync.c | 2 ++ .../regress/expected/multi_metadata_snapshot.out | 14 +++++++++++++- src/test/regress/sql/multi_metadata_snapshot.sql | 10 +++++++++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 7f5935b78..e8baa9bfc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -73,6 +73,8 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureSuperUser(); + PreventTransactionChain(true, "start_metadata_sync_to_node"); + workerNode = FindWorkerNode(nodeNameString, nodePort); if (workerNode == NULL) diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index d874a0c18..1463b464a 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -237,7 +237,7 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; --------------+------------+-------------------+------------------------ (0 rows) --- Check that repeated calls to sync_metadata has no side effects +-- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node @@ -308,6 +308,18 @@ Indexes: "mx_test_table_col_1_key" UNIQUE CONSTRAINT, btree (col_1) "mx_index" btree (col_2) +-- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +\c - - - :master_port +BEGIN; +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +ERROR: start_metadata_sync_to_node cannot run inside a transaction block +ROLLBACK; +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + f +(1 row) + -- Cleanup \c - - - :worker_1_port DROP TABLE mx_testing_schema.mx_test_table; diff --git a/src/test/regress/sql/multi_metadata_snapshot.sql b/src/test/regress/sql/multi_metadata_snapshot.sql index f428b8400..b5251b2f2 100644 --- a/src/test/regress/sql/multi_metadata_snapshot.sql +++ b/src/test/regress/sql/multi_metadata_snapshot.sql @@ -80,7 +80,7 @@ SELECT * FROM pg_dist_shard_placement ORDER BY shardid; -- Check that pg_dist_colocation is not synced SELECT * FROM pg_dist_colocation ORDER BY colocationid; --- Check that repeated calls to sync_metadata has no side effects +-- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -92,6 +92,14 @@ SELECT * FROM pg_dist_shard ORDER BY shardid; SELECT * FROM pg_dist_shard_placement ORDER BY shardid; \d mx_testing_schema.mx_test_table +-- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +\c - - - :master_port +BEGIN; +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +ROLLBACK; + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + -- Cleanup \c - - - :worker_1_port DROP TABLE mx_testing_schema.mx_test_table; From 5e96e4f60e0c334fb5fd3d25cfda7b22a6c34135 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Fri, 2 Dec 2016 14:41:48 +0300 Subject: [PATCH 4/7] Make truncate triggers propagated on start_metadata_sync_to_node call --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-1--6.1-2.sql | 12 +++++ src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 3 +- .../distributed/metadata/metadata_sync.c | 23 ++++++++++ .../worker/worker_truncate_trigger_protocol.c | 46 +++++++++++++++++++ .../distributed/master_metadata_utility.h | 1 + src/test/regress/expected/multi_extension.out | 1 + .../expected/multi_metadata_snapshot.out | 34 ++++++++++---- src/test/regress/sql/multi_extension.sql | 1 + .../regress/sql/multi_metadata_snapshot.sql | 4 ++ 11 files changed, 119 insertions(+), 12 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-1--6.1-2.sql create mode 100644 src/backend/distributed/worker/worker_truncate_trigger_protocol.c diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 9b4ace629..2a7171c17 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 + 6.1-1 6.1-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -97,6 +97,8 @@ $(EXTENSION)--6.0-18.sql: $(EXTENSION)--6.0-17.sql $(EXTENSION)--6.0-17--6.0-18. cat $^ > $@ $(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sql cat $^ > $@ +$(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-1--6.1-2.sql b/src/backend/distributed/citus--6.1-1--6.1-2.sql new file mode 100644 index 000000000..af87cac17 --- /dev/null +++ b/src/backend/distributed/citus--6.1-1--6.1-2.sql @@ -0,0 +1,12 @@ +/* citus--6.1-1--6.1-2.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION worker_create_truncate_trigger(table_name regclass) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_create_truncate_trigger$$; +COMMENT ON FUNCTION worker_create_truncate_trigger(tablename regclass) + IS 'create truncate trigger for distributed table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 74ed4aa9e..1f7ea03ac 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-1' +default_version = '6.1-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3247d3c98..b59add3fe 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -76,7 +76,6 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, uint32 colocationId); static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static void CreateTruncateTrigger(Oid relationId); static uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType); static uint32 GetNextColocationId(void); @@ -823,7 +822,7 @@ LocalTableEmpty(Oid tableId) * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. */ -static void +void CreateTruncateTrigger(Oid relationId) { CreateTrigStmt *trigger = NULL; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e8baa9bfc..97278d3fd 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -44,6 +44,7 @@ static char * LocalGroupIdUpdateCommand(uint32 groupId); static void MarkNodeHasMetadata(char *nodeName, int32 nodePort); +static char * TruncateTriggerCreateCommand(Oid relationId); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); @@ -485,6 +486,7 @@ GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry) { char *ownerResetCommand = NULL; char *metadataCommand = NULL; + char *truncateTriggerCreateCommand = NULL; Oid relationId = cacheEntry->relationId; List *commandList = GetTableDDLEvents(relationId); @@ -495,6 +497,9 @@ GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry) metadataCommand = DistributionCreateCommand(cacheEntry); commandList = lappend(commandList, metadataCommand); + truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); + commandList = lappend(commandList, truncateTriggerCreateCommand); + return commandList; } @@ -570,3 +575,21 @@ MarkNodeHasMetadata(char *nodeName, int32 nodePort) systable_endscan(scanDescriptor); heap_close(pgDistNode, NoLock); } + + +/* + * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger + * function, which creates the truncate trigger on the worker. + */ +static char * +TruncateTriggerCreateCommand(Oid relationId) +{ + StringInfo triggerCreateCommand = makeStringInfo(); + char *tableName = generate_qualified_relation_name(relationId); + + appendStringInfo(triggerCreateCommand, + "SELECT worker_create_truncate_trigger(%s)", + quote_literal_cstr(tableName)); + + return triggerCreateCommand->data; +} diff --git a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c new file mode 100644 index 000000000..237768e70 --- /dev/null +++ b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * worker_create_truncate_trigger_protocol.c + * + * Routines for creating truncate triggers on distributed tables on worker nodes. + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "distributed/citus_ruleutils.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" +#include "utils/elog.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" + + +PG_FUNCTION_INFO_V1(worker_create_truncate_trigger); + + +/* + * worker_create_truncate_trigger creates a truncate trigger for the given distributed + * table on current metadata worker. The function is intented to be called by the + * coordinator node during metadata propagation of mx tables or during the upgrades from + * citus version <=5.2 to >=6.1. The function requires superuser permissions. + */ +Datum +worker_create_truncate_trigger(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + + EnsureSuperUser(); + + /* Create the truncate trigger */ + CreateTruncateTrigger(relationId); + + PG_RETURN_VOID(); +} diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 3881e8d61..3a8a62670 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -80,6 +80,7 @@ extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); +extern void CreateTruncateTrigger(Oid relationId); /* Remaining metadata utility functions */ extern char * TableOwner(Oid relationId); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 740b40318..23d9e2244 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -59,6 +59,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; +ALTER EXTENSION citus UPDATE TO '6.1-2'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index 1463b464a..3e6954900 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -61,9 +61,10 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's') + SELECT worker_create_truncate_trigger('public.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(10 rows) +(11 rows) -- Show that CREATE INDEX commands are included in the metadata snapshot CREATE INDEX mx_index ON mx_test_table(col_2); @@ -81,9 +82,10 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's') + SELECT worker_create_truncate_trigger('public.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(11 rows) +(12 rows) -- Show that schema changes are included in the metadata snapshot CREATE SCHEMA mx_testing_schema; @@ -103,9 +105,10 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') + SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(12 rows) +(13 rows) -- Show that append distributed tables are not included in the metadata snapshot CREATE TABLE non_mx_test_table (col_1 int, col_2 text); @@ -129,9 +132,10 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') + SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(12 rows) +(13 rows) -- Show that range distributed tables are not included in the metadata snapshot UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; @@ -148,9 +152,10 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') + SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(12 rows) +(13 rows) -- Test start_metadata_sync_to_node UDF -- Ensure that hasmetadata=false for all nodes @@ -163,7 +168,7 @@ SELECT count(*) FROM pg_dist_node WHERE hasmetadata=true; -- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ------------------------ +----------------------------- (1 row) @@ -237,17 +242,24 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; --------------+------------+-------------------+------------------------ (0 rows) +-- Make sure that truncate trigger has been set for the MX table on worker +SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; + count +------- + 1 +(1 row) + -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ------------------------ +----------------------------- (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ------------------------ +----------------------------- (1 row) @@ -308,6 +320,12 @@ Indexes: "mx_test_table_col_1_key" UNIQUE CONSTRAINT, btree (col_1) "mx_index" btree (col_2) +SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; + count +------- + 1 +(1 row) + -- Make sure that start_metadata_sync_to_node cannot be called inside a transaction \c - - - :master_port BEGIN; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 4caaaaba8..d00abae7a 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -59,6 +59,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; +ALTER EXTENSION citus UPDATE TO '6.1-2'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_metadata_snapshot.sql b/src/test/regress/sql/multi_metadata_snapshot.sql index b5251b2f2..d760c039c 100644 --- a/src/test/regress/sql/multi_metadata_snapshot.sql +++ b/src/test/regress/sql/multi_metadata_snapshot.sql @@ -80,6 +80,9 @@ SELECT * FROM pg_dist_shard_placement ORDER BY shardid; -- Check that pg_dist_colocation is not synced SELECT * FROM pg_dist_colocation ORDER BY colocationid; +-- Make sure that truncate trigger has been set for the MX table on worker +SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; + -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -91,6 +94,7 @@ SELECT * FROM pg_dist_partition ORDER BY logicalrelid; SELECT * FROM pg_dist_shard ORDER BY shardid; SELECT * FROM pg_dist_shard_placement ORDER BY shardid; \d mx_testing_schema.mx_test_table +SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; -- Make sure that start_metadata_sync_to_node cannot be called inside a transaction \c - - - :master_port From fb08093b00ac7b7e918e5b65241c03413728e85e Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Mon, 5 Dec 2016 15:40:24 +0300 Subject: [PATCH 5/7] Make start_metadata_sync_to_node UDF to propagate foreign-key constraints --- .../distributed/metadata/metadata_sync.c | 92 +++++++++++-------- .../distributed/worker/worker_drop_protocol.c | 8 +- src/include/distributed/metadata_sync.h | 1 - .../expected/multi_metadata_snapshot.out | 44 +++++++++ .../regress/sql/multi_metadata_snapshot.sql | 27 ++++++ 5 files changed, 130 insertions(+), 42 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 97278d3fd..7e2d3c2e2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -158,6 +158,7 @@ MetadataCreateCommands(void) { List *metadataSnapshotCommandList = NIL; List *distributedTableList = DistributedTableList(); + List *mxTableList = NIL; List *workerNodeList = WorkerNodeList(); ListCell *distributedTableCell = NULL; char *nodeListInsertCommand = NULL; @@ -167,26 +168,67 @@ MetadataCreateCommands(void) metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, nodeListInsertCommand); - /* iterate over the distributed tables */ + /* create the list of mx tables */ foreach(distributedTableCell, distributedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); - List *clusteredTableDDLEvents = NIL; + if (ShouldSyncTableMetadata(cacheEntry->relationId)) + { + mxTableList = lappend(mxTableList, cacheEntry); + } + } + + /* create the mx tables, but not the metadata */ + foreach(distributedTableCell, mxTableList) + { + DistTableCacheEntry *cacheEntry = + (DistTableCacheEntry *) lfirst(distributedTableCell); + Oid relationId = cacheEntry->relationId; + + List *commandList = GetTableDDLEvents(relationId); + char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); + + metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, + commandList); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + tableOwnerResetCommand); + } + + /* construct the foreign key constraints after all tables are created */ + foreach(distributedTableCell, mxTableList) + { + DistTableCacheEntry *cacheEntry = + (DistTableCacheEntry *) lfirst(distributedTableCell); + + List *foreignConstraintCommands = + GetTableForeignConstraintCommands(cacheEntry->relationId); + + metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, + foreignConstraintCommands); + } + + /* after all tables are created, create the metadata */ + foreach(distributedTableCell, mxTableList) + { + DistTableCacheEntry *cacheEntry = + (DistTableCacheEntry *) lfirst(distributedTableCell); List *shardIntervalList = NIL; List *shardCreateCommandList = NIL; + char *metadataCommand = NULL; + char *truncateTriggerCreateCommand = NULL; Oid clusteredTableId = cacheEntry->relationId; - /* add only clustered tables */ - if (!ShouldSyncTableMetadata(clusteredTableId)) - { - continue; - } + /* add the table metadata command first*/ + metadataCommand = DistributionCreateCommand(cacheEntry); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + metadataCommand); - /* add the DDL events first */ - clusteredTableDDLEvents = GetDistributedTableDDLEvents(cacheEntry); - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - clusteredTableDDLEvents); + /* add the truncate trigger command after the table became distributed */ + truncateTriggerCreateCommand = + TruncateTriggerCreateCommand(cacheEntry->relationId); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + truncateTriggerCreateCommand); /* add the pg_dist_shard{,placement} entries */ shardIntervalList = LoadShardIntervalList(clusteredTableId); @@ -476,34 +518,6 @@ NodeDeleteCommand(uint32 nodeId) } -/* - * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to - * create this relation on a worker. This includes setting up any sequences, - * setting the owner of the table, and inserting into metadata tables. - */ -List * -GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry) -{ - char *ownerResetCommand = NULL; - char *metadataCommand = NULL; - char *truncateTriggerCreateCommand = NULL; - Oid relationId = cacheEntry->relationId; - - List *commandList = GetTableDDLEvents(relationId); - - ownerResetCommand = TableOwnerResetCommand(relationId); - commandList = lappend(commandList, ownerResetCommand); - - metadataCommand = DistributionCreateCommand(cacheEntry); - commandList = lappend(commandList, metadataCommand); - - truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); - commandList = lappend(commandList, truncateTriggerCreateCommand); - - return commandList; -} - - /* * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * of a worker and returns the command in a string. diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 6876d090f..077f78dc9 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -42,6 +42,8 @@ static void DeletePartitionRow(Oid distributedRelationId); * not dropped as in the case of "DROP TABLE distributed_table;" command. * * The function errors out if the input relation Oid is not a regular or foreign table. + * The function is meant to be called only by the coordinator, therefore requires + * superuser privileges. */ Datum worker_drop_distributed_table(PG_FUNCTION_ARGS) @@ -55,6 +57,8 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) ListCell *shardCell = NULL; char relationKind = '\0'; + EnsureSuperUser(); + /* first check the relation type */ distributedRelation = relation_open(relationId, AccessShareLock); relationKind = distributedRelation->rd_rel->relkind; @@ -96,8 +100,8 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) } else { - /* drop the table only */ - performDeletion(&distributedTableObject, DROP_RESTRICT, + /* drop the table with cascade since other tables may be referring to it */ + performDeletion(&distributedTableObject, DROP_CASCADE, PERFORM_DELETION_INTERNAL); } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index c0c2a5edb..94efd573a 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -28,7 +28,6 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); extern char * NodeDeleteCommand(uint32 nodeId); -extern List * GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index 3e6954900..b8caa16c7 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -249,6 +249,50 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': 1 (1 row) +-- Make sure that start_metadata_sync_to_node considers foreign key constraints +SET citus.shard_replication_factor TO 1; +CREATE SCHEMA mx_testing_schema_2; +CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); +CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, + FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3)); +SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +UPDATE + pg_dist_partition SET repmodel='s' +WHERE + logicalrelid='mx_testing_schema.fk_test_1'::regclass + OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass; + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +-- Check that foreign key metadata exists on the worker +\c - - - :worker_1_port +\d mx_testing_schema_2.fk_test_2 +Table "mx_testing_schema_2.fk_test_2" + Column | Type | Modifiers +--------+---------+----------- + col1 | integer | + col2 | integer | + col3 | text | +Foreign-key constraints: + "fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3) + +\c - - - :master_port +RESET citus.shard_replication_factor; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); diff --git a/src/test/regress/sql/multi_metadata_snapshot.sql b/src/test/regress/sql/multi_metadata_snapshot.sql index d760c039c..91c091b18 100644 --- a/src/test/regress/sql/multi_metadata_snapshot.sql +++ b/src/test/regress/sql/multi_metadata_snapshot.sql @@ -83,6 +83,33 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; -- Make sure that truncate trigger has been set for the MX table on worker SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; +-- Make sure that start_metadata_sync_to_node considers foreign key constraints +SET citus.shard_replication_factor TO 1; + +CREATE SCHEMA mx_testing_schema_2; + +CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); +CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, + FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3)); + +SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); +SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); + +UPDATE + pg_dist_partition SET repmodel='s' +WHERE + logicalrelid='mx_testing_schema.fk_test_1'::regclass + OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass; + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +-- Check that foreign key metadata exists on the worker +\c - - - :worker_1_port +\d mx_testing_schema_2.fk_test_2 +\c - - - :master_port + +RESET citus.shard_replication_factor; + -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); From b94647c3bcbde661ec37fee9ff0d84c37cadd605 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Mon, 5 Dec 2016 23:01:30 +0300 Subject: [PATCH 6/7] Propagate CREATE SCHEMA commands with the correct AUTHORIZATION clause in start_metadata_sync_to_node --- .../master/master_metadata_utility.c | 2 + .../distributed/master/master_node_protocol.c | 51 ++++++++++++++++++- src/include/distributed/master_protocol.h | 2 +- .../expected/multi_generate_ddl_commands.out | 6 +-- .../expected/multi_metadata_snapshot.out | 6 +-- 5 files changed, 59 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 51f87554a..03824c008 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -15,9 +15,11 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "access/sysattr.h" #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/pg_type.h" +#include "catalog/pg_namespace.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 990d11e2a..cf7a2e36f 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -26,6 +26,7 @@ #include "access/htup_details.h" #include "access/skey.h" #include "access/stratnum.h" +#include "access/sysattr.h" #include "access/tupdesc.h" #include "catalog/dependency.h" #include "catalog/indexing.h" @@ -37,6 +38,7 @@ #endif #include "catalog/pg_index.h" #include "catalog/pg_type.h" +#include "catalog/pg_namespace.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" #include "distributed/listutils.h" @@ -56,6 +58,7 @@ #include "utils/palloc.h" #include "utils/relcache.h" #include "utils/ruleutils.h" +#include "utils/tqual.h" /* Shard related configuration */ @@ -66,6 +69,7 @@ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); +static char * SchemaOwner(Oid schemaId); /* exports for SQL callable functions */ @@ -633,7 +637,8 @@ GetTableDDLEvents(Oid relationId) if (strncmp(schemaName, "public", NAMEDATALEN) != 0) { StringInfo schemaNameDef = makeStringInfo(); - appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, schemaName); + char *ownerName = SchemaOwner(schemaId); + appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, schemaName, ownerName); tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data); } @@ -855,3 +860,47 @@ WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor) return workerNodeDatum; } + + +/* + * SchemaOwner returns the name of the owner of the specified schema. + */ +char * +SchemaOwner(Oid schemaId) +{ + const int scanKeyCount = 1; + + Relation namespaceRelation = heap_open(NamespaceRelationId, AccessShareLock); + ScanKeyData scanKeyData[scanKeyCount]; + SysScanDesc scanDescriptor = NULL; + HeapTuple tuple = NULL; + char *ownerName = NULL; + + /* start scan */ + ScanKeyInit(&scanKeyData[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(schemaId)); + + scanDescriptor = systable_beginscan(namespaceRelation, NamespaceOidIndexId, true, + SnapshotSelf, 1, &scanKeyData[0]); + tuple = systable_getnext(scanDescriptor); + + if (HeapTupleIsValid(tuple)) + { + Form_pg_namespace nsptup = (Form_pg_namespace) GETSTRUCT(tuple); + Oid ownerId = nsptup->nspowner; + + ownerName = GetUserNameFromId(ownerId, false); + } + else + { + /* if the schema is not found, then return the name of current user */ + ownerName = GetUserNameFromId(GetUserId(), false); + } + + systable_endscan(scanDescriptor); + heap_close(namespaceRelation, NoLock); + + return ownerName; +} diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 035f60f7b..4be511b04 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -66,7 +66,7 @@ #define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)" #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s CASCADE" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE" -#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s" +#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s" #define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" #define FINALIZED_SHARD_PLACEMENTS_QUERY \ "SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld" diff --git a/src/test/regress/expected/multi_generate_ddl_commands.out b/src/test/regress/expected/multi_generate_ddl_commands.out index 1594b6361..f1a82c0da 100644 --- a/src/test/regress/expected/multi_generate_ddl_commands.out +++ b/src/test/regress/expected/multi_generate_ddl_commands.out @@ -36,9 +36,9 @@ SELECT table_ddl_command_array('not_null_table'); -- ensure tables not in search path are schema-prefixed CREATE SCHEMA not_in_path CREATE TABLE simple_table (id bigint); SELECT table_ddl_command_array('not_in_path.simple_table'); - table_ddl_command_array -------------------------------------------------------------------------------------------------- - {"CREATE SCHEMA IF NOT EXISTS not_in_path","CREATE TABLE not_in_path.simple_table (id bigint)"} + table_ddl_command_array +------------------------------------------------------------------------------------------------------------------------ + {"CREATE SCHEMA IF NOT EXISTS not_in_path AUTHORIZATION postgres","CREATE TABLE not_in_path.simple_table (id bigint)"} (1 row) -- even more complex constraints should be preserved... diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index b8caa16c7..a5995e64c 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -98,7 +98,7 @@ SELECT unnest(master_metadata_snapshot()); TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) - CREATE SCHEMA IF NOT EXISTS mx_testing_schema + CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) @@ -125,7 +125,7 @@ SELECT unnest(master_metadata_snapshot()); TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) - CREATE SCHEMA IF NOT EXISTS mx_testing_schema + CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) @@ -145,7 +145,7 @@ SELECT unnest(master_metadata_snapshot()); TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) - CREATE SCHEMA IF NOT EXISTS mx_testing_schema + CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) From afbb5ffb318f0b1d6b0164d9ccc944d3041941e9 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Mon, 12 Dec 2016 14:59:01 +0300 Subject: [PATCH 7/7] Add stop_metadata_sync_to_node UDF --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-2--6.1-3.sql | 12 ++++++ src/backend/distributed/citus.control | 2 +- .../distributed/metadata/metadata_sync.c | 37 +++++++++++++++-- src/test/regress/expected/multi_extension.out | 1 + .../expected/multi_metadata_snapshot.out | 41 ++++++++++++++++++- src/test/regress/sql/multi_extension.sql | 1 + .../regress/sql/multi_metadata_snapshot.sql | 12 +++++- 8 files changed, 100 insertions(+), 10 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-2--6.1-3.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 2a7171c17..84801a1c5 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 + 6.1-1 6.1-2 6.1-3 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -99,6 +99,8 @@ $(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sq cat $^ > $@ $(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql cat $^ > $@ +$(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-2--6.1-3.sql b/src/backend/distributed/citus--6.1-2--6.1-3.sql new file mode 100644 index 000000000..1f786125f --- /dev/null +++ b/src/backend/distributed/citus--6.1-2--6.1-3.sql @@ -0,0 +1,12 @@ +/* citus--6.1-2--6.1-3.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION stop_metadata_sync_to_node(nodename text, nodeport integer) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$stop_metadata_sync_to_node$$; +COMMENT ON FUNCTION stop_metadata_sync_to_node(nodename text, nodeport integer) + IS 'stop metadata sync to node'; + +RESET search_path; \ No newline at end of file diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 1f7ea03ac..27e416df9 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-2' +default_version = '6.1-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 7e2d3c2e2..643b6102c 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -43,11 +43,12 @@ static char * LocalGroupIdUpdateCommand(uint32 groupId); -static void MarkNodeHasMetadata(char *nodeName, int32 nodePort); +static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); static char * TruncateTriggerCreateCommand(Oid relationId); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); +PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); /* @@ -111,7 +112,35 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner, recreateMetadataSnapshotCommandList); - MarkNodeHasMetadata(nodeNameString, nodePort); + MarkNodeHasMetadata(nodeNameString, nodePort, true); + + PG_RETURN_VOID(); +} + + +/* + * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node + * to false in pg_dist_node table, thus indicating that the specified worker node does not + * receive DDL changes anymore and cannot be used for issuing mx queries. + */ +Datum +stop_metadata_sync_to_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + WorkerNode *workerNode = NULL; + + EnsureSuperUser(); + + workerNode = FindWorkerNode(nodeNameString, nodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node (%s,%d) does not exist", nodeNameString, nodePort))); + } + + MarkNodeHasMetadata(nodeNameString, nodePort, false); PG_RETURN_VOID(); } @@ -539,7 +568,7 @@ LocalGroupIdUpdateCommand(uint32 groupId) * pg_dist_node to true. */ static void -MarkNodeHasMetadata(char *nodeName, int32 nodePort) +MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata) { const bool indexOK = false; const int scanKeyCount = 2; @@ -573,7 +602,7 @@ MarkNodeHasMetadata(char *nodeName, int32 nodePort) memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(true); + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); isnull[Anum_pg_dist_node_hasmetadata - 1] = false; replace[Anum_pg_dist_node_hasmetadata - 1] = true; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 23d9e2244..237a6808d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -60,6 +60,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; +ALTER EXTENSION citus UPDATE TO '6.1-3'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index a5995e64c..f3ef9c6aa 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -382,6 +382,32 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; f (1 row) +-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; + hasmetadata +------------- + t +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; + hasmetadata +------------- + f +(1 row) + -- Cleanup \c - - - :worker_1_port DROP TABLE mx_testing_schema.mx_test_table; @@ -391,6 +417,17 @@ DELETE FROM pg_dist_shard; DELETE FROM pg_dist_shard_placement; \d mx_testing_schema.mx_test_table \c - - - :master_port -DROP TABLE mx_testing_schema.mx_test_table; -UPDATE pg_dist_node SET hasmetadata=false; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +DROP TABLE mx_testing_schema.mx_test_table CASCADE; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index d00abae7a..c67ec9a6d 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -60,6 +60,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; +ALTER EXTENSION citus UPDATE TO '6.1-3'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_metadata_snapshot.sql b/src/test/regress/sql/multi_metadata_snapshot.sql index 91c091b18..63869e110 100644 --- a/src/test/regress/sql/multi_metadata_snapshot.sql +++ b/src/test/regress/sql/multi_metadata_snapshot.sql @@ -131,6 +131,13 @@ ROLLBACK; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; +-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; + -- Cleanup \c - - - :worker_1_port DROP TABLE mx_testing_schema.mx_test_table; @@ -141,7 +148,8 @@ DELETE FROM pg_dist_shard_placement; \d mx_testing_schema.mx_test_table \c - - - :master_port -DROP TABLE mx_testing_schema.mx_test_table; -UPDATE pg_dist_node SET hasmetadata=false; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +DROP TABLE mx_testing_schema.mx_test_table CASCADE; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;