diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index ee5b115ef..9b4ace629 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,8 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ + 6.1-1 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -94,6 +95,8 @@ $(EXTENSION)--6.0-17.sql: $(EXTENSION)--6.0-16.sql $(EXTENSION)--6.0-16--6.0-17. cat $^ > $@ $(EXTENSION)--6.0-18.sql: $(EXTENSION)--6.0-17.sql $(EXTENSION)--6.0-17--6.0-18.sql cat $^ > $@ +$(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-18--6.1-1.sql b/src/backend/distributed/citus--6.0-18--6.1-1.sql new file mode 100644 index 000000000..44660e753 --- /dev/null +++ b/src/backend/distributed/citus--6.0-18--6.1-1.sql @@ -0,0 +1,12 @@ +/* citus--6.0-18--6.1-1.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$start_metadata_sync_to_node$$; +COMMENT ON FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer) + IS 'sync metadata to node'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 63d1859f1..74ed4aa9e 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-18' +default_version = '6.1-1' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 11f794e0c..7f5935b78 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -17,8 +17,12 @@ #include #include +#include "access/genam.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" #include "catalog/dependency.h" +#include "catalog/indexing.h" #include "catalog/pg_foreign_server.h" #include "distributed/citus_ruleutils.h" #include "distributed/distribution_column.h" @@ -27,14 +31,89 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" +#include "distributed/pg_dist_node.h" #include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" #include "foreign/foreign.h" #include "nodes/pg_list.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +static char * LocalGroupIdUpdateCommand(uint32 groupId); +static void MarkNodeHasMetadata(char *nodeName, int32 nodePort); + + +PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); + + +/* + * start_metadata_sync_to_node function creates the metadata in a worker for preparing the + * worker for accepting MX-table queries. The function first sets the localGroupId of the + * worker so that the worker knows which tuple in pg_dist_node table represents itself. + * After that, SQL statetemens for re-creating metadata about mx distributed + * tables are sent to the worker. Finally, the hasmetadata column of the target node in + * pg_dist_node is marked as true. + */ +Datum +start_metadata_sync_to_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + char *extensionOwner = CitusExtensionOwnerName(); + + WorkerNode *workerNode = NULL; + char *localGroupIdUpdateCommand = NULL; + List *recreateMetadataSnapshotCommandList = NIL; + List *dropMetadataCommandList = NIL; + List *createMetadataCommandList = NIL; + + EnsureSuperUser(); + + workerNode = FindWorkerNode(nodeNameString, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("you cannot sync metadata to a non-existent node"), + errhint("First, add the node with SELECT master_add_node(%s,%d)", + nodeNameString, nodePort))); + } + + /* generate and add the local group id's update query */ + localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); + + /* generate the queries which drop the metadata */ + dropMetadataCommandList = MetadataDropCommands(); + + /* generate the queries which create the metadata from scratch */ + createMetadataCommandList = MetadataCreateCommands(); + + recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList, + localGroupIdUpdateCommand); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + dropMetadataCommandList); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + createMetadataCommandList); + + /* + * Send the snapshot recreation commands in a single remote transaction and + * error out in any kind of failure. Note that it is not required to send + * createMetadataSnapshotCommandList in the same transaction that we send + * nodeDeleteCommand and nodeInsertCommand commands below. + */ + SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner, + recreateMetadataSnapshotCommandList); + + MarkNodeHasMetadata(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + /* * ShouldSyncTableMetadata checks if a distributed table has streaming replication model * and hash distribution. In that case the distributed table is considered an MX table, @@ -416,3 +495,76 @@ GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry) return commandList; } + + +/* + * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id + * of a worker and returns the command in a string. + */ +static char * +LocalGroupIdUpdateCommand(uint32 groupId) +{ + StringInfo updateCommand = makeStringInfo(); + + appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d", + groupId); + + return updateCommand->data; +} + + +/* + * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in + * pg_dist_node to true. + */ +static void +MarkNodeHasMetadata(char *nodeName, int32 nodePort) +{ + const bool indexOK = false; + const int scanKeyCount = 2; + + Relation pgDistNode = NULL; + TupleDesc tupleDescriptor = NULL; + ScanKeyData scanKey[scanKeyCount]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_node]; + bool isnull[Natts_pg_dist_node]; + bool replace[Natts_pg_dist_node]; + + pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistNode); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, + BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort)); + + scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(true); + isnull[Anum_pg_dist_node_hasmetadata - 1] = false; + replace[Anum_pg_dist_node_hasmetadata - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistNode, heapTuple); + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, NoLock); +} diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index a1c946eb7..4b8753694 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -59,7 +59,6 @@ extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationI extern int GetLocalGroupId(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); -extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 16940c745..740b40318 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15'; ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; +ALTER EXTENSION citus UPDATE TO '6.1-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index c375969df..4caaaaba8 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15'; ALTER EXTENSION citus UPDATE TO '6.0-16'; ALTER EXTENSION citus UPDATE TO '6.0-17'; ALTER EXTENSION citus UPDATE TO '6.0-18'; +ALTER EXTENSION citus UPDATE TO '6.1-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)