Add start_metadata_sync_to_node UDF

This change adds `start_metadata_sync_to_node` UDF which copies the metadata about nodes and MX tables
from master to the specified worker, sets its local group ID and marks its hasmetadata to true to
allow it receive future DDL changes.
pull/997/head
Eren Basak 2016-11-08 16:35:41 -08:00
parent 21effef8b5
commit 9eff968d1f
7 changed files with 171 additions and 3 deletions

View File

@ -8,7 +8,8 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -94,6 +95,8 @@ $(EXTENSION)--6.0-17.sql: $(EXTENSION)--6.0-16.sql $(EXTENSION)--6.0-16--6.0-17.
cat $^ > $@
$(EXTENSION)--6.0-18.sql: $(EXTENSION)--6.0-17.sql $(EXTENSION)--6.0-17--6.0-18.sql
cat $^ > $@
$(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,12 @@
/* citus--6.0-18--6.1-1.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$start_metadata_sync_to_node$$;
COMMENT ON FUNCTION start_metadata_sync_to_node(nodename text, nodeport integer)
IS 'sync metadata to node';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-18'
default_version = '6.1-1'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -17,8 +17,12 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_foreign_server.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distribution_column.h"
@ -27,14 +31,89 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_node.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "nodes/pg_list.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
static char * LocalGroupIdUpdateCommand(uint32 groupId);
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
/*
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting MX-table queries. The function first sets the localGroupId of the
* worker so that the worker knows which tuple in pg_dist_node table represents itself.
* After that, SQL statetemens for re-creating metadata about mx distributed
* tables are sent to the worker. Finally, the hasmetadata column of the target node in
* pg_dist_node is marked as true.
*/
Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)
{
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
char *extensionOwner = CitusExtensionOwnerName();
WorkerNode *workerNode = NULL;
char *localGroupIdUpdateCommand = NULL;
List *recreateMetadataSnapshotCommandList = NIL;
List *dropMetadataCommandList = NIL;
List *createMetadataCommandList = NIL;
EnsureSuperUser();
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("you cannot sync metadata to a non-existent node"),
errhint("First, add the node with SELECT master_add_node(%s,%d)",
nodeNameString, nodePort)));
}
/* generate and add the local group id's update query */
localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
/* generate the queries which drop the metadata */
dropMetadataCommandList = MetadataDropCommands();
/* generate the queries which create the metadata from scratch */
createMetadataCommandList = MetadataCreateCommands();
recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList,
localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
dropMetadataCommandList);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
/*
* Send the snapshot recreation commands in a single remote transaction and
* error out in any kind of failure. Note that it is not required to send
* createMetadataSnapshotCommandList in the same transaction that we send
* nodeDeleteCommand and nodeInsertCommand commands below.
*/
SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner,
recreateMetadataSnapshotCommandList);
MarkNodeHasMetadata(nodeNameString, nodePort);
PG_RETURN_VOID();
}
/*
* ShouldSyncTableMetadata checks if a distributed table has streaming replication model
* and hash distribution. In that case the distributed table is considered an MX table,
@ -416,3 +495,76 @@ GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry)
return commandList;
}
/*
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
* of a worker and returns the command in a string.
*/
static char *
LocalGroupIdUpdateCommand(uint32 groupId)
{
StringInfo updateCommand = makeStringInfo();
appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d",
groupId);
return updateCommand->data;
}
/*
* MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in
* pg_dist_node to true.
*/
static void
MarkNodeHasMetadata(char *nodeName, int32 nodePort)
{
const bool indexOK = false;
const int scanKeyCount = 2;
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
ScanKeyData scanKey[scanKeyCount];
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistNode);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort));
scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
nodeName, nodePort)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(true);
isnull[Anum_pg_dist_node_hasmetadata - 1] = false;
replace[Anum_pg_dist_node_hasmetadata - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple);
CatalogUpdateIndexes(pgDistNode, heapTuple);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistNode, NoLock);
}

View File

@ -59,7 +59,6 @@ extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationI
extern int GetLocalGroupId(void);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern void CitusInvalidateNodeCache(void);
extern bool CitusHasBeenLoaded(void);

View File

@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15';
ALTER EXTENSION citus UPDATE TO '6.0-16';
ALTER EXTENSION citus UPDATE TO '6.0-17';
ALTER EXTENSION citus UPDATE TO '6.0-18';
ALTER EXTENSION citus UPDATE TO '6.1-1';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -58,6 +58,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-15';
ALTER EXTENSION citus UPDATE TO '6.0-16';
ALTER EXTENSION citus UPDATE TO '6.0-17';
ALTER EXTENSION citus UPDATE TO '6.0-18';
ALTER EXTENSION citus UPDATE TO '6.1-1';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)