From 9eff968d1f4a64f6de762742b07eb2d388c3c158 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Tue, 8 Nov 2016 16:35:41 -0800 Subject: [PATCH] Add start_metadata_sync_to_node UDF This change adds `start_metadata_sync_to_node` UDF which copies the metadata about nodes and MX tables from master to the specified worker, sets its local group ID and marks its hasmetadata to true to allow it receive future DDL changes. --- src/backend/distributed/Makefile | 5 +- .../distributed/citus--6.0-18--6.1-1.sql | 12 ++ src/backend/distributed/citus.control | 2 +- .../distributed/metadata/metadata_sync.c | 152 ++++++++++++++++++ src/include/distributed/metadata_cache.h | 1 - src/test/regress/expected/multi_extension.out | 1 + src/test/regress/sql/multi_extension.sql | 1 + 7 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-18--6.1-1.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index ee5b115ef..9b4ace629 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,8 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ + 6.1-1 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -94,6 +95,8 @@ $(EXTENSION)--6.0-17.sql: $(EXTENSION)--6.0-16.sql $(EXTENSION)--6.0-16--6.0-17. cat $^ > $@ $(EXTENSION)--6.0-18.sql: $(EXTENSION)--6.0-17.sql $(EXTENSION)--6.0-17--6.0-18.sql cat $^ > $@ +$(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-18--6.1-1.sql b/src/backend/distributed/citus--6.0-18--6.1-1.sql new file mode 100644 index 000000000..44660e753 --- /dev/null +++ b/src/backend/distributed/citus--6.0-18--6.1-1.sql @@ -0,0 +1,12 @@ +/* citus--6.0-18--6.1-1.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$start_metadata_sync_to_node$$; +COMMENT ON FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer) + IS 'sync metadata to node'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 63d1859f1..74ed4aa9e 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-18' +default_version = '6.1-1' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 11f794e0c..7f5935b78 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -17,8 +17,12 @@ #include #include +#include "access/genam.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" #include "catalog/dependency.h" +#include "catalog/indexing.h" #include "catalog/pg_foreign_server.h" #include "distributed/citus_ruleutils.h" #include "distributed/distribution_column.h" @@ -27,14 +31,89 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" +#include "distributed/pg_dist_node.h" #include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" #include "foreign/foreign.h" #include "nodes/pg_list.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +static char * LocalGroupIdUpdateCommand(uint32 groupId); +static void MarkNodeHasMetadata(char *nodeName, int32 nodePort); + + +PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); + + +/* + * start_metadata_sync_to_node function creates the metadata in a worker for preparing the + * worker for accepting MX-table queries. The function first sets the localGroupId of the + * worker so that the worker knows which tuple in pg_dist_node table represents itself. + * After that, SQL statetemens for re-creating metadata about mx distributed + * tables are sent to the worker. Finally, the hasmetadata column of the target node in + * pg_dist_node is marked as true. + */ +Datum +start_metadata_sync_to_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + char *extensionOwner = CitusExtensionOwnerName(); + + WorkerNode *workerNode = NULL; + char *localGroupIdUpdateCommand = NULL; + List *recreateMetadataSnapshotCommandList = NIL; + List *dropMetadataCommandList = NIL; + List *createMetadataCommandList = NIL; + + EnsureSuperUser(); + + workerNode = FindWorkerNode(nodeNameString, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("you cannot sync metadata to a non-existent node"), + errhint("First, add the node with SELECT master_add_node(%s,%d)", + nodeNameString, nodePort))); + } + + /* generate and add the local group id's update query */ + localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); + + /* generate the queries which drop the metadata */ + dropMetadataCommandList = MetadataDropCommands(); + + /* generate the queries which create the metadata from scratch */ + createMetadataCommandList = MetadataCreateCommands(); + + recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList, + localGroupIdUpdateCommand); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + dropMetadataCommandList); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + createMetadataCommandList); + + /* + * Send the snapshot recreation commands in a single remote transaction and + * error out in any kind of failure. Note that it is not required to send + * createMetadataSnapshotCommandList in the same transaction that we send + * nodeDeleteCommand and nodeInsertCommand commands below. + */ + SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner, + recreateMetadataSnapshotCommandList); + + MarkNodeHasMetadata(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + /* * ShouldSyncTableMetadata checks if a distributed table has streaming replication model * and hash distribution. In that case the distributed table is considered an MX table, @@ -416,3 +495,76 @@ GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry) return commandList; } + + +/* + * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id + * of a worker and returns the command in a string. + */ +static char * +LocalGroupIdUpdateCommand(uint32 groupId) +{ + StringInfo updateCommand = makeStringInfo(); + + appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d", + groupId); + + return updateCommand->data; +} + + +/* + * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in + * pg_dist_node to true. + */ +static void +MarkNodeHasMetadata(char *nodeName, int32 nodePort) +{ + const bool indexOK = false; + const int scanKeyCount = 2; + + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + ScanKeyData scanKey[scanKeyCount]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_node]; + bool isnull[Natts_pg_dist_node]; + bool replace[Natts_pg_dist_node]; + + pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistNode); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, + BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort)); + + scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(true); + isnull[Anum_pg_dist_node_hasmetadata - 1] = false; + replace[Anum_pg_dist_node_hasmetadata - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistNode, heapTuple); + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, NoLock); +} diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index a1c946eb7..4b8753694 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -59,7 +59,6 @@ extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationI extern int GetLocalGroupId(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); -extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 16940c745..740b40318 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15'; ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; +ALTER EXTENSION citus UPDATE TO '6.1-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index c375969df..4caaaaba8 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15'; ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; +ALTER EXTENSION citus UPDATE TO '6.1-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)